Repository: flink
Updated Branches:
  refs/heads/master 2522f028b -> 84e76f4d3


http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
new file mode 100644
index 0000000..5976799
--- /dev/null
+++ 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * This test starts a MiniYARNCluster with a FIFO scheudler.
+ * There are no queues for that scheduler.
+ */
+public class YARNSessionFIFOITCase extends YarnTestBase {
+       private static final Logger LOG = 
LoggerFactory.getLogger(YARNSessionFIFOITCase.class);
+
+       /*
+       Override init with FIFO scheduler.
+        */
+       @BeforeClass
+       public static void setup() {
+               yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, 
FifoScheduler.class, ResourceScheduler.class);
+               yarnConfiguration.setInt(YarnConfiguration.NM_PMEM_MB, 768);
+               
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 
512);
+               startYARNWithConfig(yarnConfiguration);
+       }
+       /**
+        * Test regular operation, including command line parameter parsing.
+        */
+       @Test
+       public void testClientStartup() {
+               LOG.info("Starting testClientStartup()");
+               runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+                                               "-n", "1",
+                                               "-jm", "512",
+                                               "-tm", "1024"},
+                               "Number of connected TaskManagers changed to 1. 
Slots available: 1", RunTypes.YARN_SESSION);
+               LOG.info("Finished testClientStartup()");
+               ensureNoExceptionsInLogFiles();
+       }
+
+
+       /**
+        * Test querying the YARN cluster.
+        *
+        * This test validates through 666*2 cores in the "cluster".
+        */
+       @Test
+       public void testQueryCluster() {
+               LOG.info("Starting testQueryCluster()");
+               runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 
totalCores 1332", RunTypes.YARN_SESSION); // we have 666*2 cores.
+               LOG.info("Finished testQueryCluster()");
+               ensureNoExceptionsInLogFiles();
+       }
+
+       /**
+        * Test deployment to non-existing queue. (user-reported error)
+        * Deployment to the queue is possible because there are no queues, so 
we don't check.
+        */
+       @Test
+       public void testNonexistingQueue() {
+               LOG.info("Starting testNonexistingQueue()");
+               runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
+                               "-n", "1",
+                               "-jm", "512",
+                               "-tm", "1024",
+                               "-qu", "doesntExist"}, "Number of connected 
TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);
+               LOG.info("Finished testNonexistingQueue()");
+               ensureNoExceptionsInLogFiles();
+       }
+
+       /**
+        * Test requesting more resources than available.
+        */
+       @Test
+       public void testMoreNodesThanAvailable() {
+               if(ignoreOnTravis()) {
+                       return;
+               }
+               addTestAppender();
+               LOG.info("Starting testMoreNodesThanAvailable()");
+               runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+                               "-n", "10",
+                               "-jm", "512",
+                               "-tm", "1024"}, "Number of connected 
TaskManagers changed to", RunTypes.YARN_SESSION); // the number of TMs depends 
on the speed of the test hardware
+               LOG.info("Finished testMoreNodesThanAvailable()");
+               checkForLogString("This YARN session requires 10752MB of memory 
in the cluster. There are currently only 8192MB available.");
+               ensureNoExceptionsInLogFiles();
+       }
+
+       /**
+        * The test cluster has the following resources:
+        * - 2 Nodes with 4096 MB each.
+        * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
+        *
+        * We allocate:
+        * 1 JobManager with 256 MB (will be automatically upgraded to 512 due 
to min alloc mb)
+        * 5 TaskManagers with 1585 MB
+        *
+        * user sees a total request of: 8181 MB (fits)
+        * system sees a total request of: 8437 (doesn't fit due to min alloc 
mb)
+        */
+       @Test
+       public void testResourceComputation() {
+               if(ignoreOnTravis()) {
+                       return;
+               }
+               addTestAppender();
+               LOG.info("Starting testResourceComputation()");
+               runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+                               "-n", "5",
+                               "-jm", "256",
+                               "-tm", "1585"}, "Number of connected 
TaskManagers changed to", RunTypes.YARN_SESSION);
+               LOG.info("Finished testResourceComputation()");
+               checkForLogString("This YARN session requires 8437MB of memory 
in the cluster. There are currently only 8192MB available.");
+       }
+
+       /**
+        * The test cluster has the following resources:
+        * - 2 Nodes with 4096 MB each.
+        * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
+        *
+        * We allocate:
+        * 1 JobManager with 256 MB (will be automatically upgraded to 512 due 
to min alloc mb)
+        * 2 TaskManagers with 3840 MB
+        *
+        * the user sees a total request of: 7936 MB (fits)
+        * the system sees a request of: 8192 MB (fits)
+        * HOWEVER: one machine is going to need 3840 + 512 = 4352 MB, which 
doesn't fit.
+        *
+        * --> check if the system properly rejects allocating this session.
+        */
+       @Test
+       public void testfullAlloc() {
+               if(ignoreOnTravis()) {
+                       return;
+               }
+               addTestAppender();
+               LOG.info("Starting testfullAlloc()");
+               runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+                               "-n", "2",
+                               "-jm", "256",
+                               "-tm", "3840"}, "Number of connected 
TaskManagers changed to", RunTypes.YARN_SESSION);
+               LOG.info("Finished testfullAlloc()");
+               checkForLogString("There is not enough memory available in the 
YARN cluster. The TaskManager(s) require 3840MB each. NodeManagers available: 
[4096, 4096]\n" +
+                               "After allocating the JobManager (512MB) and 
(1/2) TaskManagers, the following NodeManagers are available: [3584, 256]");
+               ensureNoExceptionsInLogFiles();
+       }
+
+       /**
+        * Test per-job yarn cluster
+        *
+        * This also tests the prefixed CliFrontend options for the YARN case
+        */
+       @Test
+       public void perJobYarnCluster() {
+               LOG.info("Starting perJobYarnCluster()");
+               File exampleJarLocation = YarnTestBase.findFile("..", new 
ContainsName("-WordCount.jar", "streaming")); // exclude streaming wordcount 
here.
+               Assert.assertNotNull("Could not find wordcount jar", 
exampleJarLocation);
+               runWithArgs(new String[] {"run", "-m", "yarn-cluster",
+                               "-yj", flinkUberjar.getAbsolutePath(),
+                               "-yn", "1",
+                               "-yjm", "512",
+                               "-ytm", "1024", 
exampleJarLocation.getAbsolutePath()}, "Job execution switched to status 
FINISHED.", RunTypes.CLI_FRONTEND);
+               LOG.info("Finished perJobYarnCluster()");
+               ensureNoExceptionsInLogFiles();
+       }
+
+       /**
+        * Test the YARN Java API
+        */
+       @Test
+       public void testJavaAPI() {
+               final int WAIT_TIME = 15;
+               LOG.info("Starting testJavaAPI()");
+
+               AbstractFlinkYarnClient flinkYarnClient = 
FlinkYarnSessionCli.getFlinkYarnClient();
+               flinkYarnClient.setTaskManagerCount(1);
+               flinkYarnClient.setJobManagerMemory(512);
+               flinkYarnClient.setTaskManagerMemory(512);
+               flinkYarnClient.setLocalJarPath(new 
Path(flinkUberjar.getAbsolutePath()));
+               String confDirPath = System.getenv("FLINK_CONF_DIR");
+               flinkYarnClient.setConfigurationDirectory(confDirPath);
+               
flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration());
+               flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + 
File.separator + "flink-conf.yaml"));
+
+               // deploy
+               AbstractFlinkYarnCluster yarnCluster = null;
+               try {
+                       yarnCluster = flinkYarnClient.deploy(null);
+               } catch (Exception e) {
+                       System.err.println("Error while deploying YARN cluster: 
"+e.getMessage());
+                       e.printStackTrace(System.err);
+                       Assert.fail();
+               }
+               FlinkYarnClusterStatus expectedStatus = new 
FlinkYarnClusterStatus(1, 1);
+               for(int second = 0; second < WAIT_TIME * 2; second++) { // run 
"forever"
+                       try {
+                               Thread.sleep(1000);
+                       } catch (InterruptedException e) {
+                               LOG.warn("Interrupted", e);
+                               Thread.interrupted();
+                       }
+                       FlinkYarnClusterStatus status = 
yarnCluster.getClusterStatus();
+                       if(status != null && status.equals(expectedStatus)) {
+                               LOG.info("Cluster reached status " + status);
+                               break; // all good, cluster started
+                       }
+                       if(second > WAIT_TIME) {
+                               // we waited for 15 seconds. cluster didn't 
come up correctly
+                               Assert.fail("The custer didn't start after " + 
WAIT_TIME + " seconds");
+                       }
+               }
+
+               // use the cluster
+               Assert.assertNotNull(yarnCluster.getJobManagerAddress());
+               Assert.assertNotNull(yarnCluster.getWebInterfaceURL());
+
+               LOG.info("Shutting down cluster. All tests passed");
+               // shutdown cluster
+               yarnCluster.shutdown();
+               LOG.info("Finished testJavaAPI()");
+
+               ensureNoExceptionsInLogFiles();
+       }
+
+       public boolean ignoreOnTravis() {
+               if(System.getenv("TRAVIS") != null && 
System.getenv("TRAVIS").equals("true")) {
+                       // we skip the test until we are able to start a 
smaller yarn clsuter
+                       // right now, the miniyarncluster has the size of the 
nodemanagers fixed on 4 GBs.
+                       LOG.warn("Skipping test on travis for now");
+                       return true;
+               }
+               return false;
+       }
+
+       //
+       // --------------- Tools to test if a certain string has been logged 
with Log4j. -------------
+       // See :  
http://stackoverflow.com/questions/3717402/how-to-test-w-junit-that-warning-was-logged-w-log4j
+       //
+       private static TestAppender testAppender;
+       public static void addTestAppender() {
+               testAppender = new TestAppender();
+               
org.apache.log4j.Logger.getRootLogger().addAppender(testAppender);
+       }
+
+       public static void checkForLogString(String expected) {
+               if(testAppender == null) {
+                       throw new NullPointerException("Initialize it first");
+               }
+               LoggingEvent found = null;
+               for(LoggingEvent event: testAppender.events) {
+                       if(event.getMessage().toString().contains(expected)) {
+                               found = event;
+                               break;
+                       }
+               }
+               if(found != null) {
+                       LOG.info("Found expected string '"+expected+"' in log 
message "+found);
+                       return;
+               }
+               Assert.fail("Unable to find expected string '"+expected+"' in 
log messages");
+       }
+
+       public static class TestAppender extends AppenderSkeleton {
+               public List<LoggingEvent> events = new 
ArrayList<LoggingEvent>();
+               public void close() {}
+               public boolean requiresLayout() {return false;}
+               @Override
+               protected void append(LoggingEvent event) {
+                       events.add(event);
+               }
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
new file mode 100644
index 0000000..200205d
--- /dev/null
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
@@ -0,0 +1,435 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+
+
+/**
+ * This base class allows to use the MiniYARNCluster.
+ * The cluster is re-used for all tests.
+ *
+ * This class is located in a different package which is build after 
flink-dist. This way,
+ * we can use the YARN uberjar of flink to start a Flink YARN session.
+ */
+public abstract class YarnTestBase {
+       private static final Logger LOG = 
LoggerFactory.getLogger(YarnTestBase.class);
+
+       private final static PrintStream originalStdout = System.out;
+       private final static PrintStream originalStderr = System.err;
+
+       private final static String TEST_CLUSTER_NAME = "flink-yarn-tests";
+
+
+       // Temp directory which is deleted after the unit test.
+       private static TemporaryFolder tmp = new TemporaryFolder();
+
+       protected static MiniYARNCluster yarnCluster = null;
+
+       protected static File flinkUberjar;
+       private static File yarnConfFile;
+
+       protected static final Configuration yarnConfiguration;
+       static {
+               yarnConfiguration = new YarnConfiguration();
+               
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 
512);
+               
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 
4096); // 4096 is the available memory anyways
+               
yarnConfiguration.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, 
true);
+               
yarnConfiguration.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
 true);
+               yarnConfiguration.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 
2);
+               
yarnConfiguration.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
+               
yarnConfiguration.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
+               
yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+               yarnConfiguration.setInt(YarnConfiguration.NM_VCORES, 666); // 
memory is overwritten in the MiniYARNCluster.
+               // so we have to change the number of cores for testing.
+       }
+
+       // This code is taken from: http://stackoverflow.com/a/7201825/568695
+       // it changes the environment variables of this JVM. Use only for 
testing purposes!
+       @SuppressWarnings("unchecked")
+       private static void setEnv(Map<String, String> newenv) {
+               try {
+                       Class<?> processEnvironmentClass = 
Class.forName("java.lang.ProcessEnvironment");
+                       Field theEnvironmentField = 
processEnvironmentClass.getDeclaredField("theEnvironment");
+                       theEnvironmentField.setAccessible(true);
+                       Map<String, String> env = (Map<String, String>) 
theEnvironmentField.get(null);
+                       env.putAll(newenv);
+                       Field theCaseInsensitiveEnvironmentField = 
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
+                       theCaseInsensitiveEnvironmentField.setAccessible(true);
+                       Map<String, String> cienv = (Map<String, String>) 
theCaseInsensitiveEnvironmentField.get(null);
+                       cienv.putAll(newenv);
+               } catch (NoSuchFieldException e) {
+                       try {
+                               Class[] classes = 
Collections.class.getDeclaredClasses();
+                               Map<String, String> env = System.getenv();
+                               for (Class cl : classes) {
+                                       if 
("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
+                                               Field field = 
cl.getDeclaredField("m");
+                                               field.setAccessible(true);
+                                               Object obj = field.get(env);
+                                               Map<String, String> map = 
(Map<String, String>) obj;
+                                               map.clear();
+                                               map.putAll(newenv);
+                                       }
+                               }
+                       } catch (Exception e2) {
+                               throw new RuntimeException(e2);
+                       }
+               } catch (Exception e1) {
+                       throw new RuntimeException(e1);
+               }
+       }
+
+       /**
+        * Sleep a bit between the tests (we are re-using the YARN cluster for 
the tests)
+        */
+       @After
+       public void sleep() {
+               try {
+                       Thread.sleep(500);
+               } catch (InterruptedException e) {
+                       Assert.fail("Should not happen");
+               }
+       }
+
+       @Before
+       public void checkClusterEmpty() throws IOException, YarnException {
+               YarnClient yarnClient = YarnClient.createYarnClient();
+               yarnClient.init(yarnConfiguration);
+               yarnClient.start();
+               List<ApplicationReport> apps = yarnClient.getApplications();
+               for(ApplicationReport app : apps) {
+                       if(app.getYarnApplicationState() != 
YarnApplicationState.FINISHED) {
+                               Assert.fail("There is at least one application 
on the cluster is not finished");
+                       }
+               }
+       }
+
+       /**
+        * Locate a file or diretory directory
+        */
+       public static File findFile(String startAt, FilenameFilter fnf) {
+               File root = new File(startAt);
+               String[] files = root.list();
+               if(files == null) {
+                       return null;
+               }
+               for(String file : files) {
+
+                       File f = new File(startAt + File.separator + file);
+                       if(f.isDirectory()) {
+                               File r = findFile(f.getAbsolutePath(), fnf);
+                               if(r != null) {
+                                       return r;
+                               }
+                       } else if (fnf.accept(f.getParentFile(), f.getName())) {
+                               return f;
+                       }
+
+               }
+               return null;
+       }
+
+       /**
+        * Filter to find root dir of the flink-yarn dist.
+        */
+       public static class RootDirFilenameFilter implements FilenameFilter {
+               @Override
+               public boolean accept(File dir, String name) {
+                       return name.startsWith("flink-dist") && 
name.endsWith(".jar") && dir.toString().contains("/lib");
+               }
+       }
+       public static class ContainsName implements FilenameFilter {
+               private String name;
+               private String excludeInPath = null;
+
+               public ContainsName(String name) {
+                       this.name = name;
+               }
+
+               public ContainsName(String name, String excludeInPath) {
+                       this.name = name;
+                       this.excludeInPath = excludeInPath;
+               }
+
+               @Override
+               public boolean accept(File dir, String name) {
+                       if(excludeInPath == null) {
+                               return name.contains(this.name);
+                       } else {
+                               return name.contains(this.name) && 
!dir.toString().contains(excludeInPath);
+                       }
+               }
+       }
+
+       public static File writeYarnSiteConfigXML(Configuration yarnConf) 
throws IOException {
+               tmp.create();
+               File yarnSiteXML = new File(tmp.newFolder().getAbsolutePath() + 
"/yarn-site.xml");
+
+               FileWriter writer = new FileWriter(yarnSiteXML);
+               yarnConf.writeXml(writer);
+               writer.flush();
+               writer.close();
+               return yarnSiteXML;
+       }
+
+       /**
+        * This method checks the written TaskManager and JobManager log files
+        * for exceptions.
+        */
+       public static void ensureNoExceptionsInLogFiles() {
+               File cwd = new File("target/"+TEST_CLUSTER_NAME);
+               Assert.assertTrue("Expecting directory 
"+cwd.getAbsolutePath()+" to exist", cwd.exists());
+               Assert.assertTrue("Expecting directory 
"+cwd.getAbsolutePath()+" to be a directory", cwd.isDirectory());
+               System.out.println("cwd = "+cwd.getAbsolutePath());
+               File foundFile = findFile(cwd.getAbsolutePath(), new 
FilenameFilter() {
+                       @Override
+                       public boolean accept(File dir, String name) {
+                               File f = new File(dir.getAbsolutePath()+ "/" + 
name);
+                               // scan each file for 'Exception'.
+                               Scanner scanner =  null;
+                               try {
+                                       scanner = new Scanner(f);
+                               } catch (FileNotFoundException e) {
+                                       LOG.warn("Unable to locate file: 
"+e.getMessage()+" file: "+f.getAbsolutePath());
+                               }
+                               while (scanner.hasNextLine()) {
+                                       final String lineFromFile = 
scanner.nextLine();
+                                       if(lineFromFile.contains("Exception")) {
+                                               return true;
+                                       }
+                               }
+                               return false;
+                       }
+               });
+               if(foundFile != null) {
+                       Scanner scanner =  null;
+                       try {
+                               scanner = new Scanner(foundFile);
+                       } catch (FileNotFoundException e) {
+                               Assert.fail("Unable to locate file: 
"+e.getMessage()+" file: "+foundFile.getAbsolutePath());
+                       }
+                       LOG.warn("Found a file with an exception. Printing 
contents:");
+                       while (scanner.hasNextLine()) {
+                               LOG.warn("LINE: "+scanner.nextLine());
+                       }
+                       Assert.fail("Found a file "+foundFile+" with an 
exception");
+               }
+       }
+
+       public static void startYARNWithConfig(Configuration conf) {
+               flinkUberjar = findFile("..", new RootDirFilenameFilter());
+               Assert.assertNotNull(flinkUberjar);
+               String flinkDistRootDir = 
flinkUberjar.getParentFile().getParent();
+
+               if (!flinkUberjar.exists()) {
+                       Assert.fail("Unable to locate yarn-uberjar.jar");
+               }
+
+               try {
+                       LOG.info("Starting up MiniYARN cluster");
+                       if (yarnCluster == null) {
+                               yarnCluster = new 
MiniYARNCluster(TEST_CLUSTER_NAME, 2, 1, 1);
+
+                               yarnCluster.init(conf);
+                               yarnCluster.start();
+                       }
+
+                       Map<String, String> map = new HashMap<String, 
String>(System.getenv());
+                       File flinkConfFilePath = findFile(flinkDistRootDir, new 
ContainsName("flink-conf.yaml"));
+                       Assert.assertNotNull(flinkConfFilePath);
+                       map.put("FLINK_CONF_DIR", 
flinkConfFilePath.getParent());
+                       yarnConfFile = writeYarnSiteConfigXML(conf);
+                       map.put("YARN_CONF_DIR", 
yarnConfFile.getParentFile().getAbsolutePath());
+                       map.put("IN_TESTS", "yes we are in tests"); // see 
FlinkYarnClient() for more infos
+                       setEnv(map);
+
+                       Assert.assertTrue(yarnCluster.getServiceState() == 
Service.STATE.STARTED);
+               } catch (Exception ex) {
+                       ex.printStackTrace();
+                       LOG.error("setup failure", ex);
+                       Assert.fail();
+               }
+       }
+
+       /**
+        * Default @BeforeClass impl. Overwrite this for passing a different 
configuration
+        */
+       @BeforeClass
+       public static void setup() {
+               startYARNWithConfig(yarnConfiguration);
+       }
+
+       // -------------------------- Runner -------------------------- //
+
+       private static ByteArrayOutputStream outContent;
+       private static ByteArrayOutputStream errContent;
+       enum RunTypes {
+               YARN_SESSION, CLI_FRONTEND
+       }
+
+       protected void runWithArgs(String[] args, String expect, RunTypes type) 
{
+               LOG.info("Running with args {}", Arrays.toString(args));
+
+               outContent = new ByteArrayOutputStream();
+               errContent = new ByteArrayOutputStream();
+               System.setOut(new PrintStream(outContent));
+               System.setErr(new PrintStream(errContent));
+
+
+               final int START_TIMEOUT_SECONDS = 60;
+
+               Runner runner = new Runner(args, type);
+               runner.start();
+
+               boolean expectedStringSeen = false;
+               for(int second = 0; second <  START_TIMEOUT_SECONDS; second++) {
+                       try {
+                               Thread.sleep(1000);
+                       } catch (InterruptedException e) {
+                               Assert.fail("Interruption not expected");
+                       }
+                       // check output for correct TaskManager startup.
+                       if(outContent.toString().contains(expect)
+                                       || 
errContent.toString().contains(expect) ) {
+                               expectedStringSeen = true;
+                               LOG.info("Found expected output in redirected 
streams");
+                               // send "stop" command to command line interface
+                               runner.sendStop();
+                               // wait for the thread to stop
+                               try {
+                                       runner.join(1000);
+                               } catch (InterruptedException e) {
+                                       LOG.warn("Interrupted while stopping 
runner", e);
+                               }
+                               LOG.warn("stopped");
+                               break;
+                       }
+                       // check if thread died
+                       if(!runner.isAlive()) {
+                               sendOutput();
+                               Assert.fail("Runner thread died before the test 
was finished. Return value = "+runner.getReturnValue());
+                       }
+               }
+
+               sendOutput();
+               Assert.assertTrue("During the timeout period of " + 
START_TIMEOUT_SECONDS + " seconds the " +
+                               "expected string did not show up", 
expectedStringSeen);
+               LOG.info("Test was successful");
+       }
+
+       private static void sendOutput() {
+               System.setOut(originalStdout);
+               System.setErr(originalStderr);
+
+               LOG.info("Sending stdout content through logger: \n\n{}\n\n", 
outContent.toString());
+               LOG.info("Sending stderr content through logger: \n\n{}\n\n", 
errContent.toString());
+       }
+
+       public static class Runner extends Thread {
+               private final String[] args;
+               private int returnValue;
+               private RunTypes type;
+               private FlinkYarnSessionCli yCli;
+
+               public Runner(String[] args, RunTypes type) {
+                       this.args = args;
+                       this.type = type;
+               }
+
+               public int getReturnValue() {
+                       return returnValue;
+               }
+
+               @Override
+               public void run() {
+                       switch(type) {
+                               case YARN_SESSION:
+                                       yCli = new FlinkYarnSessionCli("", "");
+                                       returnValue = yCli.run(args);
+                                       break;
+                               case CLI_FRONTEND:
+                                       try {
+                                               CliFrontend cli = new 
CliFrontend();
+                                               returnValue = 
cli.parseParameters(args);
+                                       } catch (Exception e) {
+                                               throw new RuntimeException(e);
+                                       }
+                                       break;
+                               default:
+                                       throw new RuntimeException("Unknown 
type " + type);
+                       }
+
+                       if(returnValue != 0) {
+                               Assert.fail("The YARN session returned with 
non-null value="+returnValue);
+                       }
+               }
+
+               public void sendStop() {
+                       if(yCli != null) {
+                               yCli.stop();
+                       }
+               }
+       }
+
+       // -------------------------- Tear down -------------------------- //
+
+       @AfterClass
+       public static void tearDown() {
+               //shutdown YARN cluster
+               if (yarnCluster != null) {
+                       LOG.info("shutdown MiniYarn cluster");
+                       yarnCluster.stop();
+                       yarnCluster = null;
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn-tests/src/main/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/resources/log4j-test.properties 
b/flink-yarn-tests/src/main/resources/log4j-test.properties
new file mode 100644
index 0000000..b4dbbe0
--- /dev/null
+++ b/flink-yarn-tests/src/main/resources/log4j-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=WARN, file
+
+# Log all infos in the given file
+log4j.appender.file=org.apache.log4j.ConsoleAppender
+log4j.appender.file.append=false
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - 
%m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
deleted file mode 100644
index 9fd2541..0000000
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.yarn;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.List;
-
-public class UtilsTest {
-
-       @Test
-       public void testUberjarLocator() {
-               File dir = YarnTestBase.findFile(".", new 
YarnTestBase.RootDirFilenameFilter());
-               Assert.assertNotNull(dir);
-               dir = dir.getParentFile().getParentFile(); // from uberjar to 
lib to root
-               Assert.assertTrue(dir.exists());
-               Assert.assertTrue(dir.isDirectory());
-               Assert.assertTrue(dir.toString().contains("flink-dist"));
-               List<String> files = Arrays.asList(dir.list());
-               Assert.assertTrue(files.contains("lib"));
-               Assert.assertTrue(files.contains("bin"));
-               Assert.assertTrue(files.contains("conf"));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
deleted file mode 100644
index 7da355b..0000000
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.yarn;
-
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.flink.yarn.YARNSessionFIFOITCase.addTestAppender;
-import static org.apache.flink.yarn.YARNSessionFIFOITCase.checkForLogString;
-
-
-/**
- * This test starts a MiniYARNCluster with a CapacityScheduler.
- * Is has, by default a queue called "default". The configuration here adds 
another queue: "qa-team".
- */
-public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
-       private static final Logger LOG = 
LoggerFactory.getLogger(YARNSessionCapacitySchedulerITCase.class);
-
-       @BeforeClass
-       public static void setup() {
-               yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, 
CapacityScheduler.class, ResourceScheduler.class);
-               yarnConfiguration.set("yarn.scheduler.capacity.root.queues", 
"default,qa-team");
-               
yarnConfiguration.setInt("yarn.scheduler.capacity.root.default.capacity", 40);
-               
yarnConfiguration.setInt("yarn.scheduler.capacity.root.qa-team.capacity", 60);
-               startYARNWithConfig(yarnConfiguration);
-       }
-
-       /**
-        * Test regular operation, including command line parameter parsing.
-        */
-       @Test
-       public void testClientStartup() {
-               runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
-                                               "-n", "1",
-                                               "-jm", "512",
-                                               "-tm", "1024", "-qu", 
"qa-team"},
-                               "Number of connected TaskManagers changed to 1. 
Slots available: 1", RunTypes.YARN_SESSION);
-
-               ensureNoExceptionsInLogFiles();
-       }
-
-
-       /**
-        * Test deployment to non-existing queue. (user-reported error)
-        * Deployment to the queue is possible because there are no queues, so 
we don't check.
-        */
-       @Test
-       public void testNonexistingQueue() {
-               addTestAppender();
-               runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
-                               "-n", "1",
-                               "-jm", "512",
-                               "-tm", "1024",
-                               "-qu", "doesntExist"}, "to unknown queue: 
doesntExist", RunTypes.YARN_SESSION);
-               checkForLogString("The specified queue 'doesntExist' does not 
exist. Available queues: default, qa-team");
-
-               ensureNoExceptionsInLogFiles();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
deleted file mode 100644
index d5f301b..0000000
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.yarn;
-
-import org.apache.flink.client.FlinkYarnSessionCli;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
-import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
-import org.apache.log4j.AppenderSkeleton;
-import org.apache.log4j.spi.LoggingEvent;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * This test starts a MiniYARNCluster with a FIFO scheudler.
- * There are no queues for that scheduler.
- */
-public class YARNSessionFIFOITCase extends YarnTestBase {
-       private static final Logger LOG = 
LoggerFactory.getLogger(YARNSessionFIFOITCase.class);
-
-       /*
-       Override init with FIFO scheduler.
-        */
-       @BeforeClass
-       public static void setup() {
-               yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, 
FifoScheduler.class, ResourceScheduler.class);
-               yarnConfiguration.setInt(YarnConfiguration.NM_PMEM_MB, 768);
-               
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 
512);
-               startYARNWithConfig(yarnConfiguration);
-       }
-       /**
-        * Test regular operation, including command line parameter parsing.
-        */
-       @Test
-       public void testClientStartup() {
-               LOG.info("Starting testClientStartup()");
-               runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
-                                               "-n", "1",
-                                               "-jm", "512",
-                                               "-tm", "1024"},
-                               "Number of connected TaskManagers changed to 1. 
Slots available: 1", RunTypes.YARN_SESSION);
-               LOG.info("Finished testClientStartup()");
-               ensureNoExceptionsInLogFiles();
-       }
-
-
-       /**
-        * Test querying the YARN cluster.
-        *
-        * This test validates through 666*2 cores in the "cluster".
-        */
-       @Test
-       public void testQueryCluster() {
-               LOG.info("Starting testQueryCluster()");
-               runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 
totalCores 1332", RunTypes.YARN_SESSION); // we have 666*2 cores.
-               LOG.info("Finished testQueryCluster()");
-               ensureNoExceptionsInLogFiles();
-       }
-
-       /**
-        * Test deployment to non-existing queue. (user-reported error)
-        * Deployment to the queue is possible because there are no queues, so 
we don't check.
-        */
-       @Test
-       public void testNonexistingQueue() {
-               LOG.info("Starting testNonexistingQueue()");
-               runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
-                               "-n", "1",
-                               "-jm", "512",
-                               "-tm", "1024",
-                               "-qu", "doesntExist"}, "Number of connected 
TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);
-               LOG.info("Finished testNonexistingQueue()");
-               ensureNoExceptionsInLogFiles();
-       }
-
-       /**
-        * Test requesting more resources than available.
-        */
-       @Test
-       public void testMoreNodesThanAvailable() {
-               if(ignoreOnTravis()) {
-                       return;
-               }
-               addTestAppender();
-               LOG.info("Starting testMoreNodesThanAvailable()");
-               runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
-                               "-n", "10",
-                               "-jm", "512",
-                               "-tm", "1024"}, "Number of connected 
TaskManagers changed to", RunTypes.YARN_SESSION); // the number of TMs depends 
on the speed of the test hardware
-               LOG.info("Finished testMoreNodesThanAvailable()");
-               checkForLogString("This YARN session requires 10752MB of memory 
in the cluster. There are currently only 8192MB available.");
-               ensureNoExceptionsInLogFiles();
-       }
-
-       /**
-        * The test cluster has the following resources:
-        * - 2 Nodes with 4096 MB each.
-        * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
-        *
-        * We allocate:
-        * 1 JobManager with 256 MB (will be automatically upgraded to 512 due 
to min alloc mb)
-        * 5 TaskManagers with 1585 MB
-        *
-        * user sees a total request of: 8181 MB (fits)
-        * system sees a total request of: 8437 (doesn't fit due to min alloc 
mb)
-        */
-       @Test
-       public void testResourceComputation() {
-               if(ignoreOnTravis()) {
-                       return;
-               }
-               addTestAppender();
-               LOG.info("Starting testResourceComputation()");
-               runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
-                               "-n", "5",
-                               "-jm", "256",
-                               "-tm", "1585"}, "Number of connected 
TaskManagers changed to", RunTypes.YARN_SESSION);
-               LOG.info("Finished testResourceComputation()");
-               checkForLogString("This YARN session requires 8437MB of memory 
in the cluster. There are currently only 8192MB available.");
-       }
-
-       /**
-        * The test cluster has the following resources:
-        * - 2 Nodes with 4096 MB each.
-        * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
-        *
-        * We allocate:
-        * 1 JobManager with 256 MB (will be automatically upgraded to 512 due 
to min alloc mb)
-        * 2 TaskManagers with 3840 MB
-        *
-        * the user sees a total request of: 7936 MB (fits)
-        * the system sees a request of: 8192 MB (fits)
-        * HOWEVER: one machine is going to need 3840 + 512 = 4352 MB, which 
doesn't fit.
-        *
-        * --> check if the system properly rejects allocating this session.
-        */
-       @Test
-       public void testfullAlloc() {
-               if(ignoreOnTravis()) {
-                       return;
-               }
-               addTestAppender();
-               LOG.info("Starting testfullAlloc()");
-               runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
-                               "-n", "2",
-                               "-jm", "256",
-                               "-tm", "3840"}, "Number of connected 
TaskManagers changed to", RunTypes.YARN_SESSION);
-               LOG.info("Finished testfullAlloc()");
-               checkForLogString("There is not enough memory available in the 
YARN cluster. The TaskManager(s) require 3840MB each. NodeManagers available: 
[4096, 4096]\n" +
-                               "After allocating the JobManager (512MB) and 
(1/2) TaskManagers, the following NodeManagers are available: [3584, 256]");
-               ensureNoExceptionsInLogFiles();
-       }
-
-       /**
-        * Test per-job yarn cluster
-        *
-        * This also tests the prefixed CliFrontend options for the YARN case
-        */
-       @Test
-       public void perJobYarnCluster() {
-               LOG.info("Starting perJobYarnCluster()");
-               File exampleJarLocation = YarnTestBase.findFile(".", new 
ContainsName("-WordCount.jar", "streaming")); // exclude streaming wordcount 
here.
-               runWithArgs(new String[] {"run", "-m", "yarn-cluster",
-                               "-yj", flinkUberjar.getAbsolutePath(),
-                               "-yn", "1",
-                               "-yjm", "512",
-                               "-ytm", "1024", 
exampleJarLocation.getAbsolutePath()}, "Job execution switched to status 
FINISHED.", RunTypes.CLI_FRONTEND);
-               LOG.info("Finished perJobYarnCluster()");
-               ensureNoExceptionsInLogFiles();
-       }
-
-       /**
-        * Test the YARN Java API
-        */
-       @Test
-       public void testJavaAPI() {
-               final int WAIT_TIME = 15;
-               LOG.info("Starting testJavaAPI()");
-
-               AbstractFlinkYarnClient flinkYarnClient = 
FlinkYarnSessionCli.getFlinkYarnClient();
-               flinkYarnClient.setTaskManagerCount(1);
-               flinkYarnClient.setJobManagerMemory(512);
-               flinkYarnClient.setTaskManagerMemory(512);
-               flinkYarnClient.setLocalJarPath(new 
Path(flinkUberjar.getAbsolutePath()));
-               String confDirPath = System.getenv("FLINK_CONF_DIR");
-               flinkYarnClient.setConfigurationDirectory(confDirPath);
-               
flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration());
-               flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + 
File.separator + "flink-conf.yaml"));
-
-               // deploy
-               AbstractFlinkYarnCluster yarnCluster = null;
-               try {
-                       yarnCluster = flinkYarnClient.deploy(null);
-               } catch (Exception e) {
-                       System.err.println("Error while deploying YARN cluster: 
"+e.getMessage());
-                       e.printStackTrace(System.err);
-                       Assert.fail();
-               }
-               FlinkYarnClusterStatus expectedStatus = new 
FlinkYarnClusterStatus(1, 1);
-               for(int second = 0; second < WAIT_TIME * 2; second++) { // run 
"forever"
-                       try {
-                               Thread.sleep(1000);
-                       } catch (InterruptedException e) {
-                               LOG.warn("Interrupted", e);
-                               Thread.interrupted();
-                       }
-                       FlinkYarnClusterStatus status = 
yarnCluster.getClusterStatus();
-                       if(status != null && status.equals(expectedStatus)) {
-                               LOG.info("Cluster reached status " + status);
-                               break; // all good, cluster started
-                       }
-                       if(second > WAIT_TIME) {
-                               // we waited for 15 seconds. cluster didn't 
come up correctly
-                               Assert.fail("The custer didn't start after " + 
WAIT_TIME + " seconds");
-                       }
-               }
-
-               // use the cluster
-               Assert.assertNotNull(yarnCluster.getJobManagerAddress());
-               Assert.assertNotNull(yarnCluster.getWebInterfaceURL());
-
-               LOG.info("Shutting down cluster. All tests passed");
-               // shutdown cluster
-               yarnCluster.shutdown();
-               LOG.info("Finished testJavaAPI()");
-
-               ensureNoExceptionsInLogFiles();
-       }
-
-       public boolean ignoreOnTravis() {
-               if(System.getenv("TRAVIS") != null && 
System.getenv("TRAVIS").equals("true")) {
-                       // we skip the test until we are able to start a 
smaller yarn clsuter
-                       // right now, the miniyarncluster has the size of the 
nodemanagers fixed on 4 GBs.
-                       LOG.warn("Skipping test on travis for now");
-                       return true;
-               }
-               return false;
-       }
-
-       //
-       // --------------- Tools to test if a certain string has been logged 
with Log4j. -------------
-       // See :  
http://stackoverflow.com/questions/3717402/how-to-test-w-junit-that-warning-was-logged-w-log4j
-       //
-       private static TestAppender testAppender;
-       public static void addTestAppender() {
-               testAppender = new TestAppender();
-               
org.apache.log4j.Logger.getRootLogger().addAppender(testAppender);
-       }
-
-       public static void checkForLogString(String expected) {
-               if(testAppender == null) {
-                       throw new NullPointerException("Initialize it first");
-               }
-               LoggingEvent found = null;
-               for(LoggingEvent event: testAppender.events) {
-                       if(event.getMessage().toString().contains(expected)) {
-                               found = event;
-                               break;
-                       }
-               }
-               if(found != null) {
-                       LOG.info("Found expected string '"+expected+"' in log 
message "+found);
-                       return;
-               }
-               Assert.fail("Unable to find expected string '"+expected+"' in 
log messages");
-       }
-
-       public static class TestAppender extends AppenderSkeleton {
-               public List<LoggingEvent> events = new 
ArrayList<LoggingEvent>();
-               public void close() {}
-               public boolean requiresLayout() {return false;}
-               @Override
-               protected void append(LoggingEvent event) {
-                       events.add(event);
-               }
-       }
-       
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
deleted file mode 100644
index 65517d3..0000000
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ /dev/null
@@ -1,439 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.client.FlinkYarnSessionCli;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileWriter;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.lang.reflect.Field;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Scanner;
-
-
-/**
- * This base class allows to use the MiniYARNCluster.
- * The cluster is re-used for all tests.
- *
- * This class is located in a different package which is build after 
flink-dist. This way,
- * we can use the YARN uberjar of flink to start a Flink YARN session.
- */
-public abstract class YarnTestBase {
-       private static final Logger LOG = 
LoggerFactory.getLogger(YarnTestBase.class);
-
-       private final static PrintStream originalStdout = System.out;
-       private final static PrintStream originalStderr = System.err;
-
-       private final static String TEST_CLUSTER_NAME = "flink-yarn-tests";
-
-
-       // Temp directory which is deleted after the unit test.
-       private static TemporaryFolder tmp = new TemporaryFolder();
-
-       protected static MiniYARNCluster yarnCluster = null;
-
-       protected static File flinkUberjar;
-       private static File yarnConfFile;
-
-       protected static final Configuration yarnConfiguration;
-       static {
-               yarnConfiguration = new YarnConfiguration();
-               
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 
512);
-               
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 
4096); // 4096 is the available memory anyways
-               
yarnConfiguration.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, 
true);
-               
yarnConfiguration.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
 true);
-               yarnConfiguration.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 
2);
-               
yarnConfiguration.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
-               
yarnConfiguration.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
-               
yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
-               yarnConfiguration.setInt(YarnConfiguration.NM_VCORES, 666); // 
memory is overwritten in the MiniYARNCluster.
-               // so we have to change the number of cores for testing.
-       }
-
-       // This code is taken from: http://stackoverflow.com/a/7201825/568695
-       // it changes the environment variables of this JVM. Use only for 
testing purposes!
-       @SuppressWarnings("unchecked")
-       private static void setEnv(Map<String, String> newenv) {
-               try {
-                       Class<?> processEnvironmentClass = 
Class.forName("java.lang.ProcessEnvironment");
-                       Field theEnvironmentField = 
processEnvironmentClass.getDeclaredField("theEnvironment");
-                       theEnvironmentField.setAccessible(true);
-                       Map<String, String> env = (Map<String, String>) 
theEnvironmentField.get(null);
-                       env.putAll(newenv);
-                       Field theCaseInsensitiveEnvironmentField = 
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
-                       theCaseInsensitiveEnvironmentField.setAccessible(true);
-                       Map<String, String> cienv = (Map<String, String>) 
theCaseInsensitiveEnvironmentField.get(null);
-                       cienv.putAll(newenv);
-               } catch (NoSuchFieldException e) {
-                       try {
-                               Class[] classes = 
Collections.class.getDeclaredClasses();
-                               Map<String, String> env = System.getenv();
-                               for (Class cl : classes) {
-                                       if 
("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
-                                               Field field = 
cl.getDeclaredField("m");
-                                               field.setAccessible(true);
-                                               Object obj = field.get(env);
-                                               Map<String, String> map = 
(Map<String, String>) obj;
-                                               map.clear();
-                                               map.putAll(newenv);
-                                       }
-                               }
-                       } catch (Exception e2) {
-                               throw new RuntimeException(e2);
-                       }
-               } catch (Exception e1) {
-                       throw new RuntimeException(e1);
-               }
-       }
-
-       /**
-        * Sleep a bit between the tests (we are re-using the YARN cluster for 
the tests)
-        */
-       @After
-       public void sleep() {
-               try {
-                       Thread.sleep(500);
-               } catch (InterruptedException e) {
-                       Assert.fail("Should not happen");
-               }
-       }
-
-       @Before
-       public void checkClusterEmpty() throws IOException, YarnException {
-               YarnClient yarnClient = YarnClient.createYarnClient();
-               yarnClient.init(yarnConfiguration);
-               yarnClient.start();
-               List<ApplicationReport> apps = yarnClient.getApplications();
-               for(ApplicationReport app : apps) {
-                       if(app.getYarnApplicationState() != 
YarnApplicationState.FINISHED) {
-                               Assert.fail("There is at least one application 
on the cluster is not finished");
-                       }
-               }
-       }
-
-       /**
-        * Locate a file or diretory directory
-        */
-       public static File findFile(String startAt, FilenameFilter fnf) {
-               File root = new File(startAt);
-               String[] files = root.list();
-               if(files == null) {
-                       return null;
-               }
-               for(String file : files) {
-
-                       File f = new File(startAt + File.separator + file);
-                       if(f.isDirectory()) {
-                               File r = findFile(f.getAbsolutePath(), fnf);
-                               if(r != null) {
-                                       return r;
-                               }
-                       } else if (fnf.accept(f.getParentFile(), f.getName())) {
-                               return f;
-                       }
-
-               }
-               return null;
-       }
-
-       /**
-        * Filter to find root dir of the flink-yarn dist.
-        */
-       public static class RootDirFilenameFilter implements FilenameFilter {
-               @Override
-               public boolean accept(File dir, String name) {
-                       return name.endsWith("yarn-uberjar.jar") && 
dir.toString().contains("/lib");
-               }
-       }
-       public static class ContainsName implements FilenameFilter {
-               private String name;
-               private String excludeInPath = null;
-
-               public ContainsName(String name) {
-                       this.name = name;
-               }
-
-               public ContainsName(String name, String excludeInPath) {
-                       this.name = name;
-                       this.excludeInPath = excludeInPath;
-               }
-
-               @Override
-               public boolean accept(File dir, String name) {
-                       if(excludeInPath == null) {
-                               return name.contains(this.name);
-                       } else {
-                               return name.contains(this.name) && 
!dir.toString().contains(excludeInPath);
-                       }
-               }
-       }
-
-       public static File writeYarnSiteConfigXML(Configuration yarnConf) 
throws IOException {
-               tmp.create();
-               File yarnSiteXML = new File(tmp.newFolder().getAbsolutePath() + 
"/yarn-site.xml");
-
-               FileWriter writer = new FileWriter(yarnSiteXML);
-               yarnConf.writeXml(writer);
-               writer.flush();
-               writer.close();
-               return yarnSiteXML;
-       }
-
-       /**
-        * This method checks the written TaskManager and JobManager log files
-        * for exceptions.
-        */
-       public static void ensureNoExceptionsInLogFiles() {
-               File cwd = new File("target/"+TEST_CLUSTER_NAME);
-               Assert.assertTrue("Expecting directory 
"+cwd.getAbsolutePath()+" to exist", cwd.exists());
-               Assert.assertTrue("Expecting directory 
"+cwd.getAbsolutePath()+" to be a directory", cwd.isDirectory());
-               System.out.println("cwd = "+cwd.getAbsolutePath());
-               File foundFile = findFile(cwd.getAbsolutePath(), new 
FilenameFilter() {
-                       @Override
-                       public boolean accept(File dir, String name) {
-                               File f = new File(dir.getAbsolutePath()+ "/" + 
name);
-                               // scan each file for 'Exception'.
-                               Scanner scanner =  null;
-                               try {
-                                       scanner = new Scanner(f);
-                               } catch (FileNotFoundException e) {
-                                       Assert.fail("Unable to locate file: 
"+e.getMessage()+" file: "+f.getAbsolutePath());
-                               }
-                               while (scanner.hasNextLine()) {
-                                       final String lineFromFile = 
scanner.nextLine();
-                                       if(lineFromFile.contains("Exception")) {
-                                               return true;
-                                       }
-                               }
-                               return false;
-                       }
-               });
-               if(foundFile != null) {
-                       Scanner scanner =  null;
-                       try {
-                               scanner = new Scanner(foundFile);
-                       } catch (FileNotFoundException e) {
-                               Assert.fail("Unable to locate file: 
"+e.getMessage()+" file: "+foundFile.getAbsolutePath());
-                       }
-                       LOG.warn("Found a file with an exception. Printing 
contents:");
-                       while (scanner.hasNextLine()) {
-                               LOG.warn("LINE: "+scanner.nextLine());
-                       }
-                       Assert.fail("Found a file "+foundFile+" with an 
exception");
-               }
-       }
-
-       public static void main(String[] args) {
-               ensureNoExceptionsInLogFiles();
-       }
-
-       public static void startYARNWithConfig(Configuration conf) {
-               flinkUberjar = findFile(".", new RootDirFilenameFilter());
-               Assert.assertNotNull(flinkUberjar);
-               String flinkDistRootDir = 
flinkUberjar.getParentFile().getParent();
-
-               if (!flinkUberjar.exists()) {
-                       Assert.fail("Unable to locate yarn-uberjar.jar");
-               }
-
-               try {
-                       LOG.info("Starting up MiniYARN cluster");
-                       if (yarnCluster == null) {
-                               yarnCluster = new 
MiniYARNCluster(TEST_CLUSTER_NAME, 2, 1, 1);
-
-                               yarnCluster.init(conf);
-                               yarnCluster.start();
-                       }
-
-                       Map<String, String> map = new HashMap<String, 
String>(System.getenv());
-                       File flinkConfFilePath = findFile(flinkDistRootDir, new 
ContainsName("flink-conf.yaml"));
-                       Assert.assertNotNull(flinkConfFilePath);
-                       map.put("FLINK_CONF_DIR", 
flinkConfFilePath.getParent());
-                       yarnConfFile = writeYarnSiteConfigXML(conf);
-                       map.put("YARN_CONF_DIR", 
yarnConfFile.getParentFile().getAbsolutePath());
-                       map.put("IN_TESTS", "yes we are in tests"); // see 
FlinkYarnClient() for more infos
-                       setEnv(map);
-
-                       Assert.assertTrue(yarnCluster.getServiceState() == 
Service.STATE.STARTED);
-               } catch (Exception ex) {
-                       ex.printStackTrace();
-                       LOG.error("setup failure", ex);
-                       Assert.fail();
-               }
-       }
-
-       /**
-        * Default @BeforeClass impl. Overwrite this for passing a different 
configuration
-        */
-       @BeforeClass
-       public static void setup() {
-               startYARNWithConfig(yarnConfiguration);
-       }
-
-       // -------------------------- Runner -------------------------- //
-
-       private static ByteArrayOutputStream outContent;
-       private static ByteArrayOutputStream errContent;
-       enum RunTypes {
-               YARN_SESSION, CLI_FRONTEND
-       }
-
-       protected void runWithArgs(String[] args, String expect, RunTypes type) 
{
-               LOG.info("Running with args {}", Arrays.toString(args));
-
-               outContent = new ByteArrayOutputStream();
-               errContent = new ByteArrayOutputStream();
-               System.setOut(new PrintStream(outContent));
-               System.setErr(new PrintStream(errContent));
-
-
-               final int START_TIMEOUT_SECONDS = 60;
-
-               Runner runner = new Runner(args, type);
-               runner.start();
-
-               boolean expectedStringSeen = false;
-               for(int second = 0; second <  START_TIMEOUT_SECONDS; second++) {
-                       try {
-                               Thread.sleep(1000);
-                       } catch (InterruptedException e) {
-                               Assert.fail("Interruption not expected");
-                       }
-                       // check output for correct TaskManager startup.
-                       if(outContent.toString().contains(expect)
-                                       || 
errContent.toString().contains(expect) ) {
-                               expectedStringSeen = true;
-                               LOG.info("Found expected output in redirected 
streams");
-                               // send "stop" command to command line interface
-                               runner.sendStop();
-                               // wait for the thread to stop
-                               try {
-                                       runner.join(1000);
-                               } catch (InterruptedException e) {
-                                       LOG.warn("Interrupted while stopping 
runner", e);
-                               }
-                               LOG.warn("stopped");
-                               break;
-                       }
-                       // check if thread died
-                       if(!runner.isAlive()) {
-                               sendOutput();
-                               Assert.fail("Runner thread died before the test 
was finished. Return value = "+runner.getReturnValue());
-                       }
-               }
-
-               sendOutput();
-               Assert.assertTrue("During the timeout period of " + 
START_TIMEOUT_SECONDS + " seconds the " +
-                               "expected string did not show up", 
expectedStringSeen);
-               LOG.info("Test was successful");
-       }
-
-       private static void sendOutput() {
-               System.setOut(originalStdout);
-               System.setErr(originalStderr);
-
-               LOG.info("Sending stdout content through logger: \n\n{}\n\n", 
outContent.toString());
-               LOG.info("Sending stderr content through logger: \n\n{}\n\n", 
errContent.toString());
-       }
-
-       public static class Runner extends Thread {
-               private final String[] args;
-               private int returnValue;
-               private RunTypes type;
-               private FlinkYarnSessionCli yCli;
-
-               public Runner(String[] args, RunTypes type) {
-                       this.args = args;
-                       this.type = type;
-               }
-
-               public int getReturnValue() {
-                       return returnValue;
-               }
-
-               @Override
-               public void run() {
-                       switch(type) {
-                               case YARN_SESSION:
-                                       yCli = new FlinkYarnSessionCli("", "");
-                                       returnValue = yCli.run(args);
-                                       break;
-                               case CLI_FRONTEND:
-                                       try {
-                                               CliFrontend cli = new 
CliFrontend();
-                                               returnValue = 
cli.parseParameters(args);
-                                       } catch (Exception e) {
-                                               throw new RuntimeException(e);
-                                       }
-                                       break;
-                               default:
-                                       throw new RuntimeException("Unknown 
type " + type);
-                       }
-
-                       if(returnValue != 0) {
-                               Assert.fail("The YARN session returned with 
non-null value="+returnValue);
-                       }
-               }
-
-               public void sendStop() {
-                       if(yCli != null) {
-                               yCli.stop();
-                       }
-               }
-       }
-
-       // -------------------------- Tear down -------------------------- //
-
-       @AfterClass
-       public static void tearDown() {
-               //shutdown YARN cluster
-               if (yarnCluster != null) {
-                       LOG.info("shutdown MiniYarn cluster");
-                       yarnCluster.stop();
-                       yarnCluster = null;
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/resources/log4j-test.properties 
b/flink-yarn-tests/src/test/resources/log4j-test.properties
deleted file mode 100644
index b4dbbe0..0000000
--- a/flink-yarn-tests/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=WARN, file
-
-# Log all infos in the given file
-log4j.appender.file=org.apache.log4j.ConsoleAppender
-log4j.appender.file.append=false
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - 
%m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml
index 1569f15..805543e 100644
--- a/flink-yarn/pom.xml
+++ b/flink-yarn/pom.xml
@@ -51,6 +51,12 @@ under the License.
                </dependency>
 
                <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>${shading-artifact.name}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
                        <groupId>com.typesafe.akka</groupId>
                        <artifactId>akka-actor_2.10</artifactId>
                </dependency>
@@ -66,38 +72,12 @@ under the License.
                </dependency>
 
                <dependency>
-                       <groupId>org.apache.camel</groupId>
-                       <artifactId>camel-stream</artifactId>
-                       <version>2.14.0</version>
-               </dependency>
-
-               <!--  guava needs to be in "provided" scope, to make sure it is 
not included into the jars by the shading -->
-               <dependency>
                        <groupId>com.google.guava</groupId>
                        <artifactId>guava</artifactId>
                        <version>${guava.version}</version>
-                       <scope>provided</scope>
                </dependency>
                
-               <dependency>
-                       <groupId>org.apache.hadoop</groupId>
-                       <artifactId>hadoop-yarn-client</artifactId>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.hadoop</groupId>
-                       <artifactId>hadoop-common</artifactId>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.hadoop</groupId>
-                       <artifactId>hadoop-hdfs</artifactId>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.hadoop</groupId>
-                       <artifactId>hadoop-mapreduce-client-core</artifactId>
-               </dependency>
+               
        </dependencies>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
index 5c57292..16cb345 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -238,6 +239,15 @@ public class FlinkYarnClient extends 
AbstractFlinkYarnClient {
 
        @Override
        public void setShipFiles(List<File> shipFiles) {
+               File shipFile;
+               for(Iterator<File> it = shipFiles.iterator(); it.hasNext(); ) {
+                       shipFile = it.next();
+                       // remove uberjar from ship list (by default everything 
in the lib/ folder is added to
+                       // the list of files to ship, but we handle the uberjar 
separately.
+                       if(shipFile.getName().startsWith("flink-dist-") && 
shipFile.getName().endsWith("jar")) {
+                               it.remove();
+                       }
+               }
                this.shipFiles.addAll(shipFiles);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1d09d02..cf042ed 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,7 +52,7 @@ under the License.
        </scm>
 
        <modules>
-               <module>flink-shaded</module>
+               <module>flink-shaded-hadoop</module>
                <module>flink-core</module>
                <module>flink-java</module>
                <module>flink-scala</module>
@@ -71,6 +71,7 @@ under the License.
        <properties>
                
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
                
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+               <shading-artifact.name>error</shading-artifact.name>
                <hadoop-one.version>1.2.1</hadoop-one.version>
                <hadoop-two.version>2.2.0</hadoop-two.version>
                <scala.version>2.10.4</scala.version>
@@ -81,13 +82,15 @@ under the License.
                <flink.reuseForks>true</flink.reuseForks>
                <log4j.configuration>log4j-test.properties</log4j.configuration>
                <slf4j.version>1.7.7</slf4j.version>
-               <guava.version>17.0</guava.version>
+               <guava.version>18.0</guava.version>
                <scala.version>2.10.4</scala.version>
                <akka.version>2.3.7</akka.version>
                <scala.binary.version>2.10</scala.binary.version>
                <scala.macros.version>2.0.1</scala.macros.version>
                <kryoserialization.version>0.3.2</kryoserialization.version>
                <protobuf.version>2.5.0</protobuf.version>
+               <chill.version>0.5.2</chill.version>
+               <asm.version>4.0</asm.version>
        </properties>
 
        <dependencies>
@@ -158,14 +161,16 @@ under the License.
        
        <!-- this section defines the module versions that are used if nothing 
else is specified. -->
        <dependencyManagement>
+               <!-- WARN: 
+                       DO NOT put      guava, 
+                                               protobuf, 
+                                               asm,
+                                               netty
+                                       here. It will overwrite Hadoop's guava 
dependency (even though we handle it
+                       separatly in the flink-shaded-hadoop module).
+                       We can use all guava versions everywhere by adding it 
directly as a dependency to each project.
+               -->
                <dependencies>
-               
-                       <!-- ASM is used by us, Kryo, Hadoop, ... -->
-                       <dependency>
-                               <groupId>org.ow2.asm</groupId>
-                               <artifactId>asm</artifactId>
-                               <version>4.0</version>
-                       </dependency>
 
                        <!-- Make sure we use a consistent jetty version 
throughout the project -->
                        <dependency>
@@ -207,6 +212,12 @@ under the License.
                                <artifactId>commons-cli</artifactId>
                                <version>1.2</version>
                        </dependency>
+
+                       <dependency>
+                               <groupId>commons-io</groupId>
+                               <artifactId>commons-io</artifactId>
+                               <version>2.4</version>
+                       </dependency>
                        
                        <!-- common-collections is used by us and by hadoop, so 
we need to define a common version -->
                        <dependency>
@@ -310,290 +321,6 @@ under the License.
                                        </exclusion>
                                </exclusions>
                        </dependency>
-
-
-                       <!-- "Old" stable Hadoop = MapReduce v1 -->
-                       <dependency>
-                               <groupId>org.apache.hadoop</groupId>
-                               <artifactId>hadoop-core</artifactId>
-                               <version>${hadoop.version}</version>
-                               <exclusions>
-                                       <exclusion>
-                                               <groupId>asm</groupId>
-                                               <artifactId>asm</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               <groupId>tomcat</groupId>
-                                               
<artifactId>jasper-compiler</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               <groupId>tomcat</groupId>
-                                               
<artifactId>jasper-runtime</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.mortbay.jetty</groupId>
-                                               <artifactId>jetty</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.mortbay.jetty</groupId>
-                                               
<artifactId>jsp-api-2.1</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.mortbay.jetty</groupId>
-                                               <artifactId>jsp-2.1</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.mortbay.jetty</groupId>
-                                               
<artifactId>jetty-util</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.eclipse.jdt</groupId>
-                                               <artifactId>core</artifactId>
-                                       </exclusion>
-                               </exclusions>
-                       </dependency>
-
-                       <!-- Hadoop 2 Dependencies -->
-                       <dependency>
-                               <groupId>org.apache.hadoop</groupId>
-                               <artifactId>hadoop-common</artifactId>
-                               <version>${hadoop.version}</version>
-                               <exclusions>
-                                       <exclusion>
-                                               <groupId>asm</groupId>
-                                               <artifactId>asm</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               <groupId>tomcat</groupId>
-                                               
<artifactId>jasper-compiler</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               <groupId>tomcat</groupId>
-                                               
<artifactId>jasper-runtime</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.mortbay.jetty</groupId>
-                                               <artifactId>jetty</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.mortbay.jetty</groupId>
-                                               
<artifactId>jsp-api-2.1</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.mortbay.jetty</groupId>
-                                               <artifactId>jsp-2.1</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.mortbay.jetty</groupId>
-                                               
<artifactId>jetty-util</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.eclipse.jdt</groupId>
-                                               <artifactId>core</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               <groupId>javax.servlet</groupId>
-                                               
<artifactId>servlet-api</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>javax.servlet.jsp</groupId>
-                                               <artifactId>jsp-api</artifactId>
-                                       </exclusion>
-                               </exclusions>
-                       </dependency>
-                       <dependency>
-                               <groupId>org.apache.hadoop</groupId>
-                               <artifactId>hadoop-hdfs</artifactId>
-                               <version>${hadoop.version}</version>
-                               <exclusions>
-                                       <exclusion>
-                                               <groupId>asm</groupId>
-                                               <artifactId>asm</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               <groupId>tomcat</groupId>
-                                               
<artifactId>jasper-compiler</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               <groupId>tomcat</groupId>
-                                               
<artifactId>jasper-runtime</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.mortbay.jetty</groupId>
-                                               <artifactId>jetty</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.mortbay.jetty</groupId>
-                                               
<artifactId>jsp-api-2.1</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.mortbay.jetty</groupId>
-                                               <artifactId>jsp-2.1</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.mortbay.jetty</groupId>
-                                               
<artifactId>jetty-util</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.eclipse.jdt</groupId>
-                                               <artifactId>core</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               <groupId>javax.servlet</groupId>
-                                               
<artifactId>servlet-api</artifactId>
-                                       </exclusion>
-                               </exclusions>
-                       </dependency>
-                       <dependency>
-                               <groupId>org.apache.hadoop</groupId>
-                               <artifactId>hadoop-client</artifactId>
-                               <version>${hadoop.version}</version>
-                               <exclusions>
-                                       <exclusion>
-                                               <groupId>asm</groupId>
-                                               <artifactId>asm</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               <groupId>tomcat</groupId>
-                                               
<artifactId>jasper-compiler</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               <groupId>tomcat</groupId>
-                                               
<artifactId>jasper-runtime</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.mortbay.jetty</groupId>
-                                               <artifactId>jetty</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.mortbay.jetty</groupId>
-                                               
<artifactId>jsp-api-2.1</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.mortbay.jetty</groupId>
-                                               <artifactId>jsp-2.1</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.mortbay.jetty</groupId>
-                                               
<artifactId>jetty-util</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.eclipse.jdt</groupId>
-                                               <artifactId>core</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               <groupId>javax.servlet</groupId>
-                                               
<artifactId>servlet-api</artifactId>
-                                       </exclusion>
-                               </exclusions>
-                       </dependency>
-                       <dependency>
-                               <groupId>org.apache.hadoop</groupId>
-                               
<artifactId>hadoop-mapreduce-client-core</artifactId>
-                               <version>${hadoop.version}</version>
-                               <exclusions>
-                                       <exclusion>
-                                               <groupId>asm</groupId>
-                                               <artifactId>asm</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               <groupId>tomcat</groupId>
-                                               
<artifactId>jasper-compiler</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               <groupId>tomcat</groupId>
-                                               
<artifactId>jasper-runtime</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.mortbay.jetty</groupId>
-                                               <artifactId>jetty</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.mortbay.jetty</groupId>
-                                               
<artifactId>jsp-api-2.1</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.mortbay.jetty</groupId>
-                                               <artifactId>jsp-2.1</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.mortbay.jetty</groupId>
-                                               
<artifactId>jetty-util</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.eclipse.jdt</groupId>
-                                               <artifactId>core</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.apache.hadoop</groupId>
-                                               
<artifactId>hadoop-yarn-common</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>com.google.inject.extensions</groupId>
-                                               
<artifactId>guice-servlet</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               <groupId>io.netty</groupId>
-                                               <artifactId>netty</artifactId>
-                                       </exclusion>
-                               </exclusions>
-                       </dependency>
-                       <dependency>
-                               <groupId>org.apache.hadoop</groupId>
-                               <artifactId>hadoop-yarn-client</artifactId>
-                               <version>${hadoop.version}</version>
-                               <exclusions>
-                                       <exclusion>
-                                               <groupId>asm</groupId>
-                                               <artifactId>asm</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               <groupId>tomcat</groupId>
-                                               
<artifactId>jasper-compiler</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               <groupId>tomcat</groupId>
-                                               
<artifactId>jasper-runtime</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.mortbay.jetty</groupId>
-                                               <artifactId>jetty</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.mortbay.jetty</groupId>
-                                               
<artifactId>jsp-api-2.1</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.mortbay.jetty</groupId>
-                                               <artifactId>jsp-2.1</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.mortbay.jetty</groupId>
-                                               
<artifactId>jetty-util</artifactId>
-                                       </exclusion>
-                                       <exclusion>
-                                               
<groupId>org.eclipse.jdt</groupId>
-                                               <artifactId>core</artifactId>
-                                       </exclusion>
-                               </exclusions>
-                       </dependency>
-                       <dependency>
-                               <groupId>org.apache.hadoop</groupId>
-                               <artifactId>hadoop-yarn-common</artifactId>
-                               <version>${hadoop.version}</version>
-                       </dependency>
-                       <dependency>
-                               <groupId>org.apache.hadoop</groupId>
-                               
<artifactId>hadoop-yarn-server-tests</artifactId>
-                               <scope>test</scope>
-                               <version>${hadoop.version}</version>
-                       </dependency>
-                       <dependency>
-                               <groupId>org.apache.hadoop</groupId>
-                               <artifactId>hadoop-minicluster</artifactId>
-                               <scope>test</scope>
-                               <version>${hadoop.version}</version>
-                       </dependency>
                </dependencies>
        </dependencyManagement>
 
@@ -608,6 +335,7 @@ under the License.
                        </activation>
                        <properties>
                                
<hadoop.version>${hadoop-one.version}</hadoop.version>
+                               
<shading-artifact.name>flink-shaded-hadoop1</shading-artifact.name>
                        </properties>
                </profile>
                <profile>
@@ -620,6 +348,7 @@ under the License.
                        </activation>
                        <properties>
                                
<hadoop.version>${hadoop-two.version}</hadoop.version>
+                               
<shading-artifact.name>flink-shaded-hadoop2</shading-artifact.name>
                        </properties>
                </profile>
 
@@ -631,121 +360,14 @@ under the License.
                                <!--hadoop2--><name>!hadoop.profile</name>
                                </property>
                        </activation>
+                       <properties>
+                               
<shading-artifact.name>flink-shaded-include-yarn</shading-artifact.name>
+                       </properties>
                        <modules>
                                <module>flink-yarn</module>
                                <module>flink-yarn-tests</module>
                        </modules>
                </profile>
-               <profile>
-                       <id>hadoop-2.0.0-alpha</id>
-                       <activation>
-                               <property>
-                                       <name>hadoop.version</name>
-                                       <value>2.0.0-alpha</value>
-                               </property>
-                       </activation>
-                       <properties>
-                               <akka.version>2.2.1</akka.version>
-                               
<kryoserialization.version>0.3.1</kryoserialization.version>
-                               <protobuf.version>2.4.1</protobuf.version>
-                       </properties>
-                       <dependencyManagement>
-                               <dependencies>
-                                       <dependency>
-                                               
<groupId>org.apache.hadoop</groupId>
-                                               
<artifactId>hadoop-common</artifactId>
-                                               
<version>${hadoop.version}</version>
-                                               <exclusions>
-                                                       <!-- This is an 
additional exclusion (Netty) -->
-                                                       <exclusion>
-                                                               
<groupId>org.jboss.netty</groupId>
-                                                               
<artifactId>netty</artifactId>
-                                                       </exclusion>
-                                                       <exclusion>
-                                                               
<groupId>asm</groupId>
-                                                               
<artifactId>asm</artifactId>
-                                                       </exclusion>
-                                                       <exclusion>
-                                                               
<groupId>tomcat</groupId>
-                                                               
<artifactId>jasper-compiler</artifactId>
-                                                       </exclusion>
-                                                       <exclusion>
-                                                               
<groupId>tomcat</groupId>
-                                                               
<artifactId>jasper-runtime</artifactId>
-                                                       </exclusion>
-                                                       <exclusion>
-                                                               
<groupId>org.mortbay.jetty</groupId>
-                                                               
<artifactId>jetty</artifactId>
-                                                       </exclusion>
-                                                       <exclusion>
-                                                               
<groupId>javax.servlet</groupId>
-                                                               
<artifactId>servlet-api</artifactId>
-                                                       </exclusion>
-                                                       <exclusion>
-                                                               
<groupId>org.mortbay.jetty</groupId>
-                                                               
<artifactId>jsp-api-2.1</artifactId>
-                                                       </exclusion>
-                                                       <exclusion>
-                                                               
<groupId>org.mortbay.jetty</groupId>
-                                                               
<artifactId>jsp-2.1</artifactId>
-                                                       </exclusion>
-                                                       <exclusion>
-                                                               
<groupId>org.mortbay.jetty</groupId>
-                                                               
<artifactId>jetty-util</artifactId>
-                                                       </exclusion>
-                                                       <exclusion>
-                                                               
<groupId>org.eclipse.jdt</groupId>
-                                                               
<artifactId>core</artifactId>
-                                                       </exclusion>
-                                               </exclusions>
-                                       </dependency>
-                                       <dependency>
-                                               
<groupId>org.apache.hadoop</groupId>
-                                               
<artifactId>hadoop-mapreduce-client-core</artifactId>
-                                               
<version>${hadoop.version}</version>
-                                               <exclusions>
-                                                       <!-- This is an 
additional exclusion (Netty) -->
-                                                       <exclusion>
-                                                               
<groupId>org.jboss.netty</groupId>
-                                                               
<artifactId>netty</artifactId>
-                                                       </exclusion>
-                                                       <exclusion>
-                                                               
<groupId>asm</groupId>
-                                                               
<artifactId>asm</artifactId>
-                                                       </exclusion>
-                                                       <exclusion>
-                                                               
<groupId>tomcat</groupId>
-                                                               
<artifactId>jasper-compiler</artifactId>
-                                                       </exclusion>
-                                                       <exclusion>
-                                                               
<groupId>tomcat</groupId>
-                                                               
<artifactId>jasper-runtime</artifactId>
-                                                       </exclusion>
-                                                       <exclusion>
-                                                               
<groupId>org.mortbay.jetty</groupId>
-                                                               
<artifactId>jetty</artifactId>
-                                                       </exclusion>
-                                                       <exclusion>
-                                                               
<groupId>org.mortbay.jetty</groupId>
-                                                               
<artifactId>jsp-api-2.1</artifactId>
-                                                       </exclusion>
-                                                       <exclusion>
-                                                               
<groupId>org.mortbay.jetty</groupId>
-                                                               
<artifactId>jsp-2.1</artifactId>
-                                                       </exclusion>
-                                                       <exclusion>
-                                                               
<groupId>org.mortbay.jetty</groupId>
-                                                               
<artifactId>jetty-util</artifactId>
-                                                       </exclusion>
-                                                       <exclusion>
-                                                               
<groupId>org.eclipse.jdt</groupId>
-                                                               
<artifactId>core</artifactId>
-                                                       </exclusion>
-                                               </exclusions>
-                                       </dependency>
-                               </dependencies>
-                       </dependencyManagement>
-               </profile>
 
                <profile>
                        <id>vendor-repos</id>
@@ -896,29 +518,6 @@ under the License.
                </profile>
        </profiles>
 
-       <reporting>
-               <plugins>
-                       <!-- execution of Unit Tests -->
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               
<artifactId>maven-surefire-report-plugin</artifactId>
-                               <version>2.17</version>
-                       </plugin>
-
-                       <!-- test coverage reports -->
-                       <plugin>
-                               <groupId>org.codehaus.mojo</groupId>
-                               <artifactId>cobertura-maven-plugin</artifactId>
-                               <version>2.6</version>
-                               <configuration>
-                                       <formats>
-                                               <format>html</format>
-                                       </formats>
-                               </configuration>
-                       </plugin>
-               </plugins>
-       </reporting>
-
        <build>
                <plugins>
                        <plugin>
@@ -935,38 +534,6 @@ under the License.
                                </configuration>
                        </plugin>
                        
-                       <!-- Relocate references to Google Guava classes into a 
different namespace -->
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-shade-plugin</artifactId>
-                               <version>2.3</version>
-                               <executions>
-                                       <execution>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>shade</goal>
-                                               </goals>
-                                               <configuration>
-                                                       
<shadedArtifactAttached>false</shadedArtifactAttached>
-                                                       
<createDependencyReducedPom>false</createDependencyReducedPom>
-                                                       <artifactSet>
-                                                               <includes>
-                                                                       
<include>org.apache.flink:${project.artifact}</include>
-                                                               </includes>
-                                                       </artifactSet>
-                                                       <relocations>
-                                                               <relocation>
-                                                                       
<pattern>com.google</pattern>
-                                                                       
<shadedPattern>org.apache.flink.shaded.com.google</shadedPattern>
-                                                                       
<excludes>
-                                                                               
<exclude>com.google.protobuf.**</exclude>
-                                                                       
</excludes>
-                                                               </relocation>
-                                                       </relocations>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
                        <plugin>
                                <groupId>org.apache.rat</groupId>
                                <artifactId>apache-rat-plugin</artifactId>
@@ -1159,6 +726,54 @@ under the License.
                                        </execution>
                                </executions>
                        </plugin>
+
+                       <!-- We use shading in all packages for relocating some 
classes, such as
+                               Guava and ASM.
+                               By doing so, users adding Flink as a dependency 
won't run into conflicts.
+                               (For example users can use whatever guava 
version they want, because we don't
+                               expose our guava dependency)
+                       -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <version>2.3</version>
+                               <executions>
+                                       <execution>
+                                               <id>shade-flink</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<shadeTestJar>true</shadeTestJar>
+                                                       
<shadedArtifactAttached>false</shadedArtifactAttached>
+                                                       
<createDependencyReducedPom>true</createDependencyReducedPom>
+                                                       
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
+                                                       <artifactSet>
+                                                               <includes>
+                                                                       
<include>com.google.guava:*</include>
+                                                                       
<include>org.ow2.asm:*</include>
+                                                               </includes>
+                                                       </artifactSet>
+                                                       <relocations>
+                                                               <relocation>
+                                                                       
<pattern>com.google</pattern>
+                                                                       
<shadedPattern>org.apache.flink.shaded.com.google</shadedPattern>
+                                                                       
<excludes>
+                                                                               
<exclude>com.google.protobuf.**</exclude>
+                                                                               
<exclude>com.google.inject.**</exclude>
+                                                                       
</excludes>
+                                                               </relocation>
+                                                               <relocation>
+                                                                       
<pattern>org.objectweb.asm</pattern>
+                                                                       
<shadedPattern>org.apache.flink.shaded.org.objectweb.asm</shadedPattern>
+                                                               </relocation>
+                                                       </relocations>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
                </plugins>
 
                <!--

http://git-wip-us.apache.org/repos/asf/flink/blob/84e76f4d/tools/travis_mvn_watchdog.sh
----------------------------------------------------------------------
diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index d94f977..0875a7e 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -45,7 +45,7 @@ LOG4J_PROPERTIES=${HERE}/log4j-travis.properties
 
 # Maven command to run. We set the forkCount manually, because otherwise Maven 
sees too many cores
 # on the Travis VMs.
-MVN="mvn -Dflink.forkCount=2 -B $PROFILE -Dlog.dir=${ARTIFACTS_DIR} 
-Dlog4j.configuration=file://$LOG4J_PROPERTIES clean install verify"
+MVN="mvn -Dflink.forkCount=2 -B $PROFILE -Dlog.dir=${ARTIFACTS_DIR} 
-Dlog4j.configuration=file://$LOG4J_PROPERTIES clean install"
 
 MVN_PID="${ARTIFACTS_DIR}/watchdog.mvn.pid"
 MVN_EXIT="${ARTIFACTS_DIR}/watchdog.mvn.exit"

Reply via email to