Some code cleanups on Scala Shell

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bb6de22
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bb6de22
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bb6de22

Branch: refs/heads/master
Commit: 6bb6de22fb6602058430d6d4eebf7cf36e404aca
Parents: 717f881
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Tue May 26 14:35:53 2015 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Thu May 28 15:48:17 2015 +0200

----------------------------------------------------------------------
 .../flink/api/java/RemoteEnvironment.java       | 14 +----
 .../api/java/ScalaShellRemoteEnvironment.java   | 29 +++--------
 .../org.apache.flink/api/scala/FlinkILoop.scala | 54 ++++++++------------
 .../org.apache.flink/api/scala/FlinkShell.scala | 28 +++++-----
 4 files changed, 44 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6bb6de22/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index a2f2891..6f84077 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -30,9 +30,9 @@ import org.apache.flink.api.common.PlanExecutor;
  */
 public class RemoteEnvironment extends ExecutionEnvironment {
        
-       private final String host;
+       protected final String host;
        
-       private final int port;
+       protected final int port;
        
        private final String[] jarFiles;
        
@@ -87,14 +87,4 @@ public class RemoteEnvironment extends ExecutionEnvironment {
                return "Remote Environment (" + this.host + ":" + this.port + " 
- parallelism = " +
                                (getParallelism() == -1 ? "default" : 
getParallelism()) + ") : " + getIdString();
        }
-
-
-       // needed to call execute on ScalaShellRemoteEnvironment
-       public int getPort() {
-               return this.port;
-       }
-
-       public String getHost() {
-               return this.host;
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bb6de22/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java
 
b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java
index cb470c9..79f9576 100644
--- 
a/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java
+++ 
b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java
@@ -25,15 +25,12 @@ import org.apache.flink.api.common.PlanExecutor;
 
 import org.apache.flink.api.scala.FlinkILoop;
 
-import java.io.File;
-
 /**
- * ScalaShellRemoteEnvironment references the JobManager through host and port 
parameters,
- * and the Scala Shell (FlinkILoop).
- * Upon calling execute(), it reads compiled lines of the Scala shell, 
aggregates them to a Jar
- * and sends aggregated jar to the JobManager.
+ * Special version of {@link org.apache.flink.api.java.RemoteEnvironment} that 
has a reference
+ * to a {@link org.apache.flink.api.scala.FlinkILoop}. When execute is called 
this will
+ * use the reference of the ILoop to write the compiled classes of the current 
session to
+ * a Jar file and submit these with the program.
  */
-
 public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
 
        // reference to Scala Shell, for access to virtual directory
@@ -62,22 +59,12 @@ public class ScalaShellRemoteEnvironment extends 
RemoteEnvironment {
        public JobExecutionResult execute(String jobName) throws Exception {
                Plan p = createProgramPlan(jobName);
 
-               // write virtual files to disk first
-               JarHelper jh = new JarHelper();
-
-               flinkILoop.writeFilesToDisk();
-
-               // jarr up.
-               File inFile = new File( 
flinkILoop.getTmpDirShell().getAbsolutePath());
-               File outFile = new File( 
flinkILoop.getTmpJarShell().getAbsolutePath());
-
-               jh.jarDir(inFile, outFile);
-
-               String[] jarFiles = {outFile.getAbsolutePath()};
+               String jarFile = 
flinkILoop.writeFilesToDisk().getAbsolutePath();
 
                // call "traditional" execution methods
-               PlanExecutor executor = 
PlanExecutor.createRemoteExecutor(super.getHost(), super.getPort(), jarFiles);
+               PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, 
port, jarFile);
+
                
executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
                return executor.executePlan(p);
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6bb6de22/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
 
b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
index 0de4953..83d0c72 100644
--- 
a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
+++ 
b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala
@@ -20,17 +20,17 @@ package org.apache.flink.api.scala
 
 import java.io.{BufferedReader, File, FileOutputStream}
 
-import scala.tools.nsc.Settings
 import scala.tools.nsc.interpreter._
 
-import org.apache.flink.api.java.ScalaShellRemoteEnvironment
+import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment}
 import org.apache.flink.util.AbstractID
 
 
-class FlinkILoop(val host: String,
-                 val port: Int,
-                 in0: Option[BufferedReader],
-                 out0: JPrintWriter)
+class FlinkILoop(
+    val host: String,
+    val port: Int,
+    in0: Option[BufferedReader],
+    out0: JPrintWriter)
   extends ILoop(in0, out0) {
 
   def this(host:String, port:Int, in0: BufferedReader, out: JPrintWriter){
@@ -52,11 +52,6 @@ class FlinkILoop(val host: String,
     scalaEnv
   }
 
-
-  /**
-   * CUSTOM START METHODS OVERRIDE:
-   */
-
   addThunk {
     intp.beQuietDuring {
       // automatically imports the flink scala api
@@ -68,7 +63,6 @@ class FlinkILoop(val host: String,
   }
 
 
-
   /**
    * creates a temporary directory to store compiled console files
    */
@@ -96,18 +90,20 @@ class FlinkILoop(val host: String,
 
 
   /**
-   * writes contents of the compiled lines that have been executed in the 
shell into a
-   * "physical directory": creates a unique temporary directory
+   * Packages the compiled classes of the current shell session into a Jar 
file for execution
+   * on a Flink cluster.
+   *
+   * @return The path of the created Jar file
    */
-  def writeFilesToDisk(): Unit = {
+  def writeFilesToDisk(): File = {
     val vd = intp.virtualDirectory
 
-    var vdIt = vd.iterator
+    val vdIt = vd.iterator
 
     for (fi <- vdIt) {
       if (fi.isDirectory) {
 
-        var fiIt = fi.iterator
+        val fiIt = fi.iterator
 
         for (f <- fiIt) {
 
@@ -128,6 +124,14 @@ class FlinkILoop(val host: String,
         }
       }
     }
+
+    val compiledClasses = new File(tmpDirShell.getAbsolutePath)
+    val jarFilePath = new File(tmpJarShell.getAbsolutePath)
+
+    val jh: JarHelper = new JarHelper
+    jh.jarDir(compiledClasses, jarFilePath)
+
+    jarFilePath
   }
 
   /**
@@ -183,20 +187,4 @@ NOTE: Use the prebound Execution Environment "env" to read 
data and execute your
 HINT: You can use print() on a DataSet to print the contents to this shell.
       """)
   }
-
-  //  getter functions:
-  // get (root temporary folder)
-  def getTmpDirBase(): File = {
-    return (this.tmpDirBase);
-  }
-
-  // get shell folder name inside tmp dir
-  def getTmpDirShell(): File = {
-    return (this.tmpDirShell)
-  }
-
-  // get tmp jar file name
-  def getTmpJarShell(): File = {
-    return (this.tmpJarShell)
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bb6de22/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
 
b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
index 90615ec..7ad1d2f 100644
--- 
a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
+++ 
b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala
@@ -24,8 +24,6 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 
 
-
-
 object FlinkShell {
 
   def main(args: Array[String]) {
@@ -33,28 +31,29 @@ object FlinkShell {
     // scopt, command line arguments
     case class Config(port: Int = -1,
                       host: String = "none")
-    val parser = new scopt.OptionParser[Config] ("scopt") {
-      head ("scopt", "3.x")
+    val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
+      head ("Flink Scala Shell")
+
       opt[Int] ('p', "port") action {
         (x, c) =>
           c.copy (port = x)
-      } text ("port specifies port of running JobManager")
+      } text("port specifies port of running JobManager")
+
       opt[(String)] ('h',"host") action {
         case (x, c) =>
           c.copy (host = x)
-      }  text ("host specifies host name of running JobManager")
-      help("help") text("prints this usage text")
+      }  text("host specifies host name of running JobManager")
 
+      help("help") text("prints this usage text")
     }
 
 
     // parse arguments
-    parser.parse (args, Config () ) map {
-      config =>
-        startShell(config.host,config.port);
-    } getOrElse {
-      // arguments are bad, usage message will have been displayed
-      println("Could not parse program arguments")
+    parser.parse (args, Config () ) match {
+      case Some(config) =>
+        startShell(config.host,config.port)
+
+      case _ => println("Could not parse program arguments")
     }
   }
 
@@ -65,8 +64,7 @@ object FlinkShell {
     var cluster: LocalFlinkMiniCluster = null
 
     // either port or userhost not specified by user, create new minicluster
-    val (host,port) = if (userHost == "none" || userPort == -1 )
-    {
+    val (host,port) = if (userHost == "none" || userPort == -1 ) {
       println("Creating new local server")
       cluster = new LocalFlinkMiniCluster(new Configuration, false)
       ("localhost",cluster.getJobManagerRPCPort)

Reply via email to