[hotfix] [yarn tests] Fix deadlock between YARN Session CLI tests and Surefire

The Surefire Plugin uses stdin to communicate with forked JVMs for tests.

The YARN Session CLI tests also try to read the stdin stream. The tests 
deadlock since
Surefire never releases the stdin locks during the lifetime of a test.

This change adds a parameter whether the YARN Session CLI should try to read
user console input, and sets this to false in the integration tests.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da23ee38
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da23ee38
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da23ee38

Branch: refs/heads/master
Commit: da23ee38e5b36ddf26a6a5a807efbbbcbfe1d517
Parents: 6511557
Author: Stephan Ewen <se...@apache.org>
Authored: Mon May 30 19:49:25 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon May 30 19:49:25 2016 +0200

----------------------------------------------------------------------
 .../flink/client/FlinkYarnSessionCli.java       | 24 +++++++++++++-------
 .../flink/client/cli/CliFrontendParser.java     |  2 +-
 .../flink/yarn/FlinkYarnSessionCliTest.java     |  2 +-
 .../org/apache/flink/yarn/YarnTestBase.java     |  2 +-
 .../src/test/resources/log4j-test.properties    |  3 ++-
 5 files changed, 21 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/da23ee38/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java 
b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
index 91f8df2..bb61ffb 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
@@ -82,6 +82,8 @@ public class FlinkYarnSessionCli {
         */
        private final Option DYNAMIC_PROPERTIES;
 
+       private final boolean acceptInteractiveInput;
+       
        //------------------------------------ Internal fields 
-------------------------
        private AbstractFlinkYarnCluster yarnCluster = null;
        private boolean detachedMode = false;
@@ -89,7 +91,9 @@ public class FlinkYarnSessionCli {
        /** Default yarn application name. */
        private String defaultApplicationName = null;
 
-       public FlinkYarnSessionCli(String shortPrefix, String longPrefix) {
+       public FlinkYarnSessionCli(String shortPrefix, String longPrefix, 
boolean acceptInteractiveInput) {
+               this.acceptInteractiveInput = acceptInteractiveInput;
+               
                QUERY = new Option(shortPrefix + "q", longPrefix + "query", 
false, "Display available YARN resources (memory, cores)");
                QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", 
true, "Specify YARN queue.");
                SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", 
true, "Ship files in the specified directory (t for transfer)");
@@ -292,7 +296,7 @@ public class FlinkYarnSessionCli {
                propertiesFile.setReadable(true, false); // readable for all.
        }
 
-       public static void runInteractiveCli(AbstractFlinkYarnCluster 
yarnCluster) {
+       public static void runInteractiveCli(AbstractFlinkYarnCluster 
yarnCluster, boolean readConsoleInput) {
                final String HELP = "Available commands:\n" +
                                "help - show these commands\n" +
                                "stop - stop the YARN session";
@@ -304,6 +308,8 @@ public class FlinkYarnSessionCli {
                                // ------------------ check if there are 
updates by the cluster -----------
 
                                GetClusterStatusResponse status = 
yarnCluster.getClusterStatus();
+                               LOG.debug("Received status message: {}", 
status);
+
                                if (status != null && numTaskmanagers != 
status.numRegisteredTaskManagers()) {
                                        System.err.println("Number of connected 
TaskManagers changed to " +
                                                        
status.numRegisteredTaskManagers() + ". " +
@@ -324,15 +330,16 @@ public class FlinkYarnSessionCli {
                                        yarnCluster.shutdown(true);
                                }
 
-                               // wait until CLIENT_POLLING_INTERVALL is over 
or the user entered something.
+                               // wait until CLIENT_POLLING_INTERVAL is over 
or the user entered something.
                                long startTime = System.currentTimeMillis();
                                while ((System.currentTimeMillis() - startTime) 
< CLIENT_POLLING_INTERVALL * 1000
-                                               && !in.ready()) {
+                                               && (!readConsoleInput || 
!in.ready()))
+                               {
                                        Thread.sleep(200);
                                }
                                //------------- handle interactive command by 
user. ----------------------
-
-                               if (in.ready()) {
+                               
+                               if (readConsoleInput && in.ready()) {
                                        String command = in.readLine();
                                        switch (command) {
                                                case "quit":
@@ -347,6 +354,7 @@ public class FlinkYarnSessionCli {
                                                        break;
                                        }
                                }
+                               
                                if (yarnCluster.hasBeenStopped()) {
                                        LOG.info("Stopping interactive command 
line interface, YARN cluster has been stopped.");
                                        break;
@@ -358,7 +366,7 @@ public class FlinkYarnSessionCli {
        }
 
        public static void main(String[] args) {
-               FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // 
no prefix for the YARN session
+               FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", 
true); // no prefix for the YARN session
                System.exit(cli.run(args));
        }
 
@@ -458,7 +466,7 @@ public class FlinkYarnSessionCli {
                                                "Please also note that the 
temporary files of the YARN session in {} will not be removed.",
                                                
flinkYarnClient.getSessionFilesDir());
                        } else {
-                               runInteractiveCli(yarnCluster);
+                               runInteractiveCli(yarnCluster, 
acceptInteractiveInput);
 
                                if (!yarnCluster.hasBeenStopped()) {
                                        LOG.info("Command Line Interface 
requested session shutdown");

http://git-wip-us.apache.org/repos/asf/flink/blob/da23ee38/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 2ac53d2..b75952e 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -35,7 +35,7 @@ public class CliFrontendParser {
 
        /** command line interface of the YARN session, with a special 
initialization here
         *  to prefix all options with y/yarn. */
-       private static final FlinkYarnSessionCli yarnSessionCLi = new 
FlinkYarnSessionCli("y", "yarn");
+       private static final FlinkYarnSessionCli yarnSessionCLi = new 
FlinkYarnSessionCli("y", "yarn", true);
 
 
        static final Option HELP_OPTION = new Option("h", "help", false,

http://git-wip-us.apache.org/repos/asf/flink/blob/da23ee38/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 30116af..7197b64 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -53,7 +53,7 @@ public class FlinkYarnSessionCliTest {
                map.put("FLINK_CONF_DIR", tmpFolder.getAbsolutePath());
                TestBaseUtils.setEnv(map);
                Options options = new Options();
-               FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "");
+               FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", 
false);
                cli.getYARNSessionCLIOptions(options);
 
                CommandLineParser parser = new PosixParser();

http://git-wip-us.apache.org/repos/asf/flink/blob/da23ee38/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index fc1e5bc..03ab647 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -562,7 +562,7 @@ public abstract class YarnTestBase extends TestLogger {
                public void run() {
                        switch(type) {
                                case YARN_SESSION:
-                                       yCli = new FlinkYarnSessionCli("", "");
+                                       yCli = new FlinkYarnSessionCli("", "", 
false);
                                        returnValue = yCli.run(args);
                                        break;
                                case CLI_FRONTEND:

http://git-wip-us.apache.org/repos/asf/flink/blob/da23ee38/flink-yarn-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/resources/log4j-test.properties 
b/flink-yarn-tests/src/test/resources/log4j-test.properties
index ebe0d37..e94ca26 100644
--- a/flink-yarn-tests/src/test/resources/log4j-test.properties
+++ b/flink-yarn-tests/src/test/resources/log4j-test.properties
@@ -30,6 +30,7 @@ 
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
 log4j.logger.org.apache.flink.yarn.YARNSessionFIFOITCase=INFO
 log4j.logger.org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase=INFO
 log4j.logger.org.apache.flink.yarn.YarnHighAvailability=INFO
-log4j.logger.org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch=INFO
+log4j.logger.org.apache.hadoop=OFF
 log4j.logger.org.apache.flink.runtime.leaderelection=INFO
 log4j.logger.org.apache.flink.runtime.leaderretrieval=INFO
+

Reply via email to