[hotfix] Clean up CliFrontend after removing web client

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17fa6a9b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17fa6a9b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17fa6a9b

Branch: refs/heads/master
Commit: 17fa6a9bc965eb4dcd64123b7d1c66a75c077db6
Parents: c0fd36b
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jan 15 18:36:13 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Jan 16 15:46:56 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 125 ++++++++-----------
 .../flink/yarn/FlinkYarnSessionCliTest.java     |  10 +-
 .../apache/flink/yarn/FlinkYarnClientBase.java  |   9 +-
 .../flink/yarn/ApplicationMasterBase.scala      |   8 +-
 4 files changed, 62 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/17fa6a9b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 7e1cef7..4b9bd06 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -19,13 +19,14 @@
 package org.apache.flink.client;
 
 import akka.actor.ActorSystem;
+
 import org.apache.commons.cli.CommandLine;
+
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.cli.CancelOptions;
 import org.apache.flink.client.cli.CliArgsException;
 import org.apache.flink.client.cli.CliFrontendParser;
@@ -65,8 +66,10 @@ import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
 import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
 import org.apache.flink.util.StringUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.Some;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -87,6 +90,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -95,7 +99,6 @@ import java.util.concurrent.TimeUnit;
 import static 
org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
 import static 
org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
 import static 
org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;
-import static 
org.apache.flink.runtime.messages.JobManagerMessages.getRequestRunningJobsStatus;
 
 /**
  * Implementation of a simple command line frontend for executing programs.
@@ -133,6 +136,7 @@ public class CliFrontend {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(CliFrontend.class);
 
+
        private final Configuration config;
 
        private final FiniteDuration askTimeout;
@@ -143,12 +147,6 @@ public class CliFrontend {
 
        private AbstractFlinkYarnCluster yarnCluster;
 
-       static boolean webFrontend = false;
-
-       private FlinkPlan optimizedPlan;
-
-       private PackagedProgram packagedProgram;
-
        /**
         *
         * @throws Exception Thrown if the configuration directory was not 
found, the configuration could not
@@ -222,9 +220,9 @@ public class CliFrontend {
 
                        // handle the YARN client's dynamic properties
                        String dynamicPropertiesEncoded = 
yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
-                       List<Tuple2<String, String>> dynamicProperties = 
getDynamicProperties(dynamicPropertiesEncoded);
-                       for (Tuple2<String, String> dynamicProperty : 
dynamicProperties) {
-                               this.config.setString(dynamicProperty.f0, 
dynamicProperty.f1);
+                       Map<String, String> dynamicProperties = 
getDynamicProperties(dynamicPropertiesEncoded);
+                       for (Map.Entry<String, String> dynamicProperty : 
dynamicProperties.entrySet()) {
+                               this.config.setString(dynamicProperty.getKey(), 
dynamicProperty.getValue());
                        }
                }
 
@@ -408,42 +406,34 @@ public class CliFrontend {
                        LOG.info("Creating program plan dump");
 
                        Optimizer compiler = new Optimizer(new 
DataStatistics(), new DefaultCostEstimator(), config);
-
                        FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, 
program, parallelism);
+                       
+                       String jsonPlan = null;
+                       if (flinkPlan instanceof OptimizedPlan) {
+                               jsonPlan = new 
PlanJSONDumpGenerator().getOptimizerPlanAsJSON((OptimizedPlan) flinkPlan);
+                       } else if (flinkPlan instanceof StreamingPlan) {
+                               jsonPlan = ((StreamingPlan) 
flinkPlan).getStreamingPlanAsJSON();
+                       }
 
-                       if (webFrontend) {
-                               this.optimizedPlan = flinkPlan;
-                               this.packagedProgram = program;
-                       } else {
-                               String jsonPlan = null;
-                               if (flinkPlan instanceof OptimizedPlan) {
-                                       jsonPlan = new 
PlanJSONDumpGenerator().getOptimizerPlanAsJSON((OptimizedPlan) flinkPlan);
-                               } else if (flinkPlan instanceof StreamingPlan) {
-                                       jsonPlan = ((StreamingPlan) 
flinkPlan).getStreamingPlanAsJSON();
-                               }
-
-                               if (jsonPlan != null) {
-                                       
System.out.println("----------------------- Execution Plan 
-----------------------");
-                                       System.out.println(jsonPlan);
-                                       
System.out.println("--------------------------------------------------------------");
-                               }
-                               else {
-                                       System.out.println("JSON plan could not 
be generated.");
-                               }
+                       if (jsonPlan != null) {
+                               System.out.println("----------------------- 
Execution Plan -----------------------");
+                               System.out.println(jsonPlan);
+                               
System.out.println("--------------------------------------------------------------");
+                       }
+                       else {
+                               System.out.println("JSON plan could not be 
generated.");
+                       }
 
-                               String description = program.getDescription();
-                               if (description != null) {
-                                       System.out.println();
-                                       System.out.println(description);
-                               }
-                               else {
-                                       System.out.println();
-                                       System.out.println("No description 
provided.");
-                               }
+                       String description = program.getDescription();
+                       if (description != null) {
+                               System.out.println();
+                               System.out.println(description);
+                       }
+                       else {
+                               System.out.println();
+                               System.out.println("No description provided.");
                        }
                        return 0;
-
-
                }
                catch (Throwable t) {
                        return handleError(t);
@@ -492,7 +482,7 @@ public class CliFrontend {
 
                        LOG.info("Connecting to JobManager to retrieve list of 
jobs");
                        Future<Object> response = jobManagerGateway.ask(
-                                       getRequestRunningJobsStatus(),
+                                       
JobManagerMessages.getRequestRunningJobsStatus(),
                                        askTimeout);
 
                        Object result;
@@ -792,10 +782,8 @@ public class CliFrontend {
                        yarnCluster.stopAfterJob(result.getJobID());
                        yarnCluster.disconnect();
                }
-
-               if (!webFrontend) {
-                       System.out.println("Job has been submitted with JobID " 
+ result.getJobID());
-               }
+               
+               System.out.println("Job has been submitted with JobID " + 
result.getJobID());
 
                return 0;
        }
@@ -816,7 +804,7 @@ public class CliFrontend {
 
                LOG.info("Program execution finished");
 
-               if (result instanceof JobExecutionResult && !webFrontend) {
+               if (result instanceof JobExecutionResult) {
                        JobExecutionResult execResult = (JobExecutionResult) 
result;
                        System.out.println("Job with JobID " + 
execResult.getJobID() + " has finished.");
                        System.out.println("Job Runtime: " + 
execResult.getNetRuntime() + " ms");
@@ -933,7 +921,6 @@ public class CliFrontend {
         * @param options Command line options which contain JobManager address
         * @param programName Program name
         * @param userParallelism Given user parallelism
-        * @return
         * @throws Exception
         */
        protected Client getClient(
@@ -1035,9 +1022,6 @@ public class CliFrontend {
         * @return The return code for the process.
         */
        private int handleArgException(Exception e) {
-               if (webFrontend) {
-                       throw new RuntimeException(e);
-               }
                LOG.error("Invalid command line arguments." + (e.getMessage() 
== null ? "" : e.getMessage()));
 
                System.out.println(e.getMessage());
@@ -1053,9 +1037,6 @@ public class CliFrontend {
         * @return The return code for the process.
         */
        private int handleError(Throwable t) {
-               if (webFrontend) {
-                       throw new RuntimeException(t);
-               }
                LOG.error("Error while running the command.", t);
 
                System.err.println();
@@ -1080,9 +1061,7 @@ public class CliFrontend {
 
        private void logAndSysout(String message) {
                LOG.info(message);
-               if (!webFrontend) {
-                       System.out.println(message);
-               }
+               System.out.println(message);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -1117,9 +1096,6 @@ public class CliFrontend {
                                if (SecurityUtils.isSecurityEnabled()) {
                                        String message = "Secure Hadoop 
environment setup detected. Running in secure context.";
                                        LOG.info(message);
-                                       if (!webFrontend) {
-                                               System.out.println(message);
-                                       }
 
                                        try {
                                                return 
SecurityUtils.runSecured(new SecurityUtils.FlinkSecuredRunner<Integer>() {
@@ -1165,14 +1141,6 @@ public class CliFrontend {
                }
        }
 
-       public FlinkPlan getFlinkPlan() {
-               return this.optimizedPlan;
-       }
-
-       public PackagedProgram getPackagedProgram() {
-               return this.packagedProgram;
-       }
-
        public void shutdown() {
                ActorSystem sys = this.actorSystem;
                if (sys != null) {
@@ -1251,20 +1219,25 @@ public class CliFrontend {
                return location;
        }
 
-       public static List<Tuple2<String, String>> getDynamicProperties(String 
dynamicPropertiesEncoded) {
-               List<Tuple2<String, String>> ret = new ArrayList<Tuple2<String, 
String>>();
-               if(dynamicPropertiesEncoded != null && 
dynamicPropertiesEncoded.length() > 0) {
+       public static Map<String, String> getDynamicProperties(String 
dynamicPropertiesEncoded) {
+               if (dynamicPropertiesEncoded != null && 
dynamicPropertiesEncoded.length() > 0) {
+                       Map<String, String> properties = new HashMap<>();
+                       
                        String[] propertyLines = 
dynamicPropertiesEncoded.split(CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
-                       for(String propLine : propertyLines) {
-                               if(propLine == null) {
+                       for (String propLine : propertyLines) {
+                               if (propLine == null) {
                                        continue;
                                }
+                               
                                String[] kv = propLine.split("=");
                                if (kv.length >= 2 && kv[0] != null && kv[1] != 
null && kv[0].length() > 0) {
-                                       ret.add(new Tuple2<String, 
String>(kv[0], kv[1]));
+                                       properties.put(kv[0], kv[1]);
                                }
                        }
+                       return properties;
+               }
+               else {
+                       return Collections.emptyMap();
                }
-               return ret;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/17fa6a9b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index fb644c3..30116af 100644
--- 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -22,11 +22,12 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
-import org.apache.flink.api.java.tuple.Tuple2;
+
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.FlinkYarnSessionCli;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
 import org.apache.flink.test.util.TestBaseUtils;
+
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -35,7 +36,6 @@ import org.junit.rules.TemporaryFolder;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 public class FlinkYarnSessionCliTest {
@@ -69,10 +69,8 @@ public class FlinkYarnSessionCliTest {
 
                Assert.assertNotNull(flinkYarnClient);
 
-               List<Tuple2<String, String>> dynProperties = 
CliFrontend.getDynamicProperties(flinkYarnClient.getDynamicPropertiesEncoded());
+               Map<String, String> dynProperties = 
CliFrontend.getDynamicProperties(flinkYarnClient.getDynamicPropertiesEncoded());
                Assert.assertEquals(1, dynProperties.size());
-               Assert.assertEquals("akka.ask.timeout", 
dynProperties.get(0).f0);
-               Assert.assertEquals("5 min", dynProperties.get(0).f1);
+               Assert.assertEquals("5 min", 
dynProperties.get("akka.ask.timeout"));
        }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/17fa6a9b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
index 993d24e..de7c933 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.yarn;
 
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.FlinkYarnSessionCli;
 import org.apache.flink.configuration.ConfigConstants;
@@ -26,6 +25,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -50,6 +50,7 @@ import 
org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.Records;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -358,9 +359,9 @@ public abstract class FlinkYarnClientBase extends 
AbstractFlinkYarnClient {
 
                // ------------------ Add dynamic properties to local 
flinkConfiguraton ------
 
-               List<Tuple2<String, String>> dynProperties = 
CliFrontend.getDynamicProperties(dynamicPropertiesEncoded);
-               for (Tuple2<String, String> dynProperty : dynProperties) {
-                       flinkConfiguration.setString(dynProperty.f0, 
dynProperty.f1);
+               Map<String, String> dynProperties = 
CliFrontend.getDynamicProperties(dynamicPropertiesEncoded);
+               for (Map.Entry<String, String> dynProperty : 
dynProperties.entrySet()) {
+                       flinkConfiguration.setString(dynProperty.getKey(), 
dynProperty.getValue());
                }
 
                // ------------------ Check if the specified queue exists 
--------------

http://git-wip-us.apache.org/repos/asf/flink/blob/17fa6a9b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
index 12f8585..7579d7d 100644
--- 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
+++ 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
@@ -244,8 +244,8 @@ abstract class ApplicationMasterBase {
 
     import scala.collection.JavaConverters._
 
-    for(property <- dynamicProperties.asScala){
-      output.println(s"${property.f0}: ${property.f1}")
+    for (property <- dynamicProperties.asScala){
+      output.println(s"${property._1}: ${property._2}")
     }
 
     output.close()
@@ -262,8 +262,8 @@ abstract class ApplicationMasterBase {
     // add dynamic properties to JobManager configuration.
     val dynamicProperties = 
CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString)
     import scala.collection.JavaConverters._
-    for(property <- dynamicProperties.asScala){
-      configuration.setString(property.f0, property.f1)
+    for (property <- dynamicProperties.asScala){
+      configuration.setString(property._1, property._2)
     }
 
     configuration

Reply via email to