[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-146891099 Merging... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1106 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r41503795 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -68,38 +94,62 @@ object FlinkShell { def startShell( userHost : String, - userPort : Int, + userPort : Int, + executionMode : ExecutionMode.Value, externalJars : Option[Array[String]] = None): Unit ={ println("Starting Flink Shell:") -var cluster: LocalFlinkMiniCluster = null - // either port or userhost not specified by user, create new minicluster -val (host,port) = if (userHost == "none" || userPort == -1 ) { - println("Creating new local server") - cluster = new LocalFlinkMiniCluster(new Configuration, false) - cluster.start() - ("localhost",cluster.getLeaderRPCPort) -} else { - println(s"Connecting to remote server (host: $userHost, port: $userPort).") - (userHost, userPort) +val (host : String, port : Int, cluster : Option[LocalFlinkMiniCluster]) = + +executionMode match { + case ExecutionMode.LOCAL => +val miniCluster = new LocalFlinkMiniCluster(new Configuration, false) +miniCluster.start() +val port = miniCluster.getLeaderRPCPort +println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n") +("localhost",port, Some(miniCluster)) + + case ExecutionMode.REMOTE => +if (userHost == "none" || userPort == -1) { + println("Error: or not specified!") + return +} else { + println(s"\nConnecting to Flink cluster (host: $userHost, port: $userPort).\n") + (userHost, userPort, None) +} + + case ExecutionMode.UNDEFINED => +println("Error: please specify execution mode:") +println("[local | remote ]") +return } --- End diff -- I think that this block should be indented more because the result of this block is assigned to (host, port, cluster). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r41503815 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -68,38 +94,62 @@ object FlinkShell { def startShell( userHost : String, - userPort : Int, + userPort : Int, + executionMode : ExecutionMode.Value, externalJars : Option[Array[String]] = None): Unit ={ println("Starting Flink Shell:") -var cluster: LocalFlinkMiniCluster = null - // either port or userhost not specified by user, create new minicluster -val (host,port) = if (userHost == "none" || userPort == -1 ) { - println("Creating new local server") - cluster = new LocalFlinkMiniCluster(new Configuration, false) - cluster.start() - ("localhost",cluster.getLeaderRPCPort) -} else { - println(s"Connecting to remote server (host: $userHost, port: $userPort).") - (userHost, userPort) +val (host : String, port : Int, cluster : Option[LocalFlinkMiniCluster]) = + +executionMode match { + case ExecutionMode.LOCAL => +val miniCluster = new LocalFlinkMiniCluster(new Configuration, false) +miniCluster.start() +val port = miniCluster.getLeaderRPCPort +println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n") +("localhost",port, Some(miniCluster)) + + case ExecutionMode.REMOTE => +if (userHost == "none" || userPort == -1) { + println("Error: or not specified!") + return +} else { + println(s"\nConnecting to Flink cluster (host: $userHost, port: $userPort).\n") + (userHost, userPort, None) +} + + case ExecutionMode.UNDEFINED => +println("Error: please specify execution mode:") +println("[local | remote ]") +return } - -// custom shell -val repl = new FlinkILoop(host, port, externalJars) //new MyILoop(); -repl.settings = new Settings() +try { + // custom shell + val repl : FlinkILoop = --- End diff -- Space after parameter name --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r41503634 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -18,48 +18,74 @@ package org.apache.flink.api.scala +import java.io.{StringWriter, BufferedReader} + +import org.apache.flink.api.common.ExecutionMode + import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster import scala.tools.nsc.Settings +import scala.tools.nsc.interpreter._ + object FlinkShell { + object ExecutionMode extends Enumeration { +val UNDEFINED, LOCAL, REMOTE = Value + } + + var bufferedReader: Option[BufferedReader] = None + def main(args: Array[String]) { // scopt, command line arguments case class Config( port: Int = -1, host: String = "none", -externalJars: Option[Array[String]] = None) +externalJars: Option[Array[String]] = None, +flinkShellExecutionMode : ExecutionMode.Value = ExecutionMode.UNDEFINED) + val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { head ("Flink Scala Shell") - opt[Int] ('p', "port") action { -(x, c) => - c.copy (port = x) - } text("port specifies port of running JobManager") - - opt[(String)] ('h',"host") action { -case (x, c) => - c.copy (host = x) - } text("host specifies host name of running JobManager") - - opt[(String)] ('a',"addclasspath") action { -case (x,c) => - val xArray = x.split(":") - c.copy(externalJars = Option(xArray)) - } text("specifies additional jars to be used in Flink") - - help("help") text("prints this usage text") + cmd("local") action { +(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode = ExecutionMode.LOCAL) + } text("starts Flink scala shell with a local Flink cluster\n") children( +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink\n") +) + + cmd("remote") action { (_, c) => +c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE) + } text("starts Flink scala shell connecting to a remote cluster\n") children( +arg[String]("") action { (h, c) => + c.copy(host = h) } + text("remote host name as string"), +arg[Int]("") action { (p, c) => + c.copy(port = p) } + text("remote port as integer\n"), +opt[(String)] ("addclasspath") abbr("a") valueName("") action { --- End diff -- Also, please remove space between `opt[(String)]` and `("addclasspath")`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r41504281 --- Diff: flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala --- @@ -225,8 +223,50 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { out.toString + stdout } - var cluster: Option[ForkableFlinkMiniCluster] = None - val parallelism = 4 + /** + * tests flink shell startup with remote cluster (starts cluster internally) + */ + test("start flink scala shell with remote cluster") { + +val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + + "els.print\nError\n:q\n" + +val in: BufferedReader = new BufferedReader( + new StringReader( +input + "\n")) +val out: StringWriter = new StringWriter + +val baos: ByteArrayOutputStream = new ByteArrayOutputStream +val oldOut: PrintStream = System.out +System.setOut(new PrintStream(baos)) + +val (c,args) = cluster match{ --- End diff -- Add space after comma --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-146519237 Hi @nikste, thanks for update. It looks good to merge except some style issues. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user nikste commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-146520283 @chiwanpark, white line issues are now fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r41503746 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -68,38 +94,62 @@ object FlinkShell { def startShell( userHost : String, - userPort : Int, + userPort : Int, + executionMode : ExecutionMode.Value, externalJars : Option[Array[String]] = None): Unit ={ println("Starting Flink Shell:") -var cluster: LocalFlinkMiniCluster = null - // either port or userhost not specified by user, create new minicluster -val (host,port) = if (userHost == "none" || userPort == -1 ) { - println("Creating new local server") - cluster = new LocalFlinkMiniCluster(new Configuration, false) - cluster.start() - ("localhost",cluster.getLeaderRPCPort) -} else { - println(s"Connecting to remote server (host: $userHost, port: $userPort).") - (userHost, userPort) +val (host : String, port : Int, cluster : Option[LocalFlinkMiniCluster]) = + --- End diff -- Unnecessary newline. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r41503700 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -68,38 +94,62 @@ object FlinkShell { def startShell( userHost : String, - userPort : Int, + userPort : Int, + executionMode : ExecutionMode.Value, externalJars : Option[Array[String]] = None): Unit ={ --- End diff -- Please remove space after parameter name like following: ```scala def startShell( userHost: String, userPort: Int, executionMode: ExecutionMode.value ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r41503724 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -68,38 +94,62 @@ object FlinkShell { def startShell( userHost : String, - userPort : Int, + userPort : Int, + executionMode : ExecutionMode.Value, externalJars : Option[Array[String]] = None): Unit ={ println("Starting Flink Shell:") -var cluster: LocalFlinkMiniCluster = null - // either port or userhost not specified by user, create new minicluster -val (host,port) = if (userHost == "none" || userPort == -1 ) { - println("Creating new local server") - cluster = new LocalFlinkMiniCluster(new Configuration, false) - cluster.start() - ("localhost",cluster.getLeaderRPCPort) -} else { - println(s"Connecting to remote server (host: $userHost, port: $userPort).") - (userHost, userPort) +val (host : String, port : Int, cluster : Option[LocalFlinkMiniCluster]) = --- End diff -- Please remove space after parameter names. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r41504146 --- Diff: docs/apis/scala_shell.md --- @@ -30,15 +30,16 @@ Flink and setting up a cluster please refer to To use the shell with an integrated Flink cluster just execute: ~~~bash -bin/start-scala-shell.sh +bin/start-scala-shell.sh local ~~~ in the root directory of your binary Flink directory. -To use it with a running cluster you can supply the host and port of the JobManager with: +To use it with a running cluster start the scala shell with the keyword remote --- End diff -- Adding comma and quotation marks would be helpful to make readable like following: ``` To use it with a running cluster, start the scala shell with the keyword "remote" ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user nikste commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-146507345 Hey @chiwanpark ! I've rebased this to the current master. Would be nice if this can be merged soon if there are no further comments on this PR! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r41503504 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -18,48 +18,74 @@ package org.apache.flink.api.scala +import java.io.{StringWriter, BufferedReader} + +import org.apache.flink.api.common.ExecutionMode + import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster import scala.tools.nsc.Settings +import scala.tools.nsc.interpreter._ + object FlinkShell { + object ExecutionMode extends Enumeration { +val UNDEFINED, LOCAL, REMOTE = Value + } + + var bufferedReader: Option[BufferedReader] = None + def main(args: Array[String]) { // scopt, command line arguments case class Config( port: Int = -1, host: String = "none", -externalJars: Option[Array[String]] = None) +externalJars: Option[Array[String]] = None, +flinkShellExecutionMode : ExecutionMode.Value = ExecutionMode.UNDEFINED) + val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { head ("Flink Scala Shell") - opt[Int] ('p', "port") action { -(x, c) => - c.copy (port = x) - } text("port specifies port of running JobManager") - - opt[(String)] ('h',"host") action { -case (x, c) => - c.copy (host = x) - } text("host specifies host name of running JobManager") - - opt[(String)] ('a',"addclasspath") action { -case (x,c) => - val xArray = x.split(":") - c.copy(externalJars = Option(xArray)) - } text("specifies additional jars to be used in Flink") - - help("help") text("prints this usage text") + cmd("local") action { +(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode = ExecutionMode.LOCAL) + } text("starts Flink scala shell with a local Flink cluster\n") children( +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => --- End diff -- Add space after `,` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r41503540 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -18,48 +18,74 @@ package org.apache.flink.api.scala +import java.io.{StringWriter, BufferedReader} + +import org.apache.flink.api.common.ExecutionMode + import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster import scala.tools.nsc.Settings +import scala.tools.nsc.interpreter._ + object FlinkShell { + object ExecutionMode extends Enumeration { +val UNDEFINED, LOCAL, REMOTE = Value + } + + var bufferedReader: Option[BufferedReader] = None + def main(args: Array[String]) { // scopt, command line arguments case class Config( port: Int = -1, host: String = "none", -externalJars: Option[Array[String]] = None) +externalJars: Option[Array[String]] = None, +flinkShellExecutionMode : ExecutionMode.Value = ExecutionMode.UNDEFINED) + val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { head ("Flink Scala Shell") - opt[Int] ('p', "port") action { -(x, c) => - c.copy (port = x) - } text("port specifies port of running JobManager") - - opt[(String)] ('h',"host") action { -case (x, c) => - c.copy (host = x) - } text("host specifies host name of running JobManager") - - opt[(String)] ('a',"addclasspath") action { -case (x,c) => - val xArray = x.split(":") - c.copy(externalJars = Option(xArray)) - } text("specifies additional jars to be used in Flink") - - help("help") text("prints this usage text") + cmd("local") action { +(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode = ExecutionMode.LOCAL) + } text("starts Flink scala shell with a local Flink cluster\n") children( +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink\n") +) + + cmd("remote") action { (_, c) => +c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE) + } text("starts Flink scala shell connecting to a remote cluster\n") children( +arg[String]("") action { (h, c) => + c.copy(host = h) } + text("remote host name as string"), +arg[Int]("") action { (p, c) => + c.copy(port = p) } + text("remote port as integer\n"), +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => --- End diff -- Add space after `,` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r41503568 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -18,48 +18,74 @@ package org.apache.flink.api.scala +import java.io.{StringWriter, BufferedReader} + +import org.apache.flink.api.common.ExecutionMode + import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster import scala.tools.nsc.Settings +import scala.tools.nsc.interpreter._ + object FlinkShell { + object ExecutionMode extends Enumeration { +val UNDEFINED, LOCAL, REMOTE = Value + } + + var bufferedReader: Option[BufferedReader] = None + def main(args: Array[String]) { // scopt, command line arguments case class Config( port: Int = -1, host: String = "none", -externalJars: Option[Array[String]] = None) +externalJars: Option[Array[String]] = None, +flinkShellExecutionMode : ExecutionMode.Value = ExecutionMode.UNDEFINED) + val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { head ("Flink Scala Shell") - opt[Int] ('p', "port") action { -(x, c) => - c.copy (port = x) - } text("port specifies port of running JobManager") - - opt[(String)] ('h',"host") action { -case (x, c) => - c.copy (host = x) - } text("host specifies host name of running JobManager") - - opt[(String)] ('a',"addclasspath") action { -case (x,c) => - val xArray = x.split(":") - c.copy(externalJars = Option(xArray)) - } text("specifies additional jars to be used in Flink") - - help("help") text("prints this usage text") + cmd("local") action { +(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode = ExecutionMode.LOCAL) + } text("starts Flink scala shell with a local Flink cluster\n") children( +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink\n") +) + + cmd("remote") action { (_, c) => +c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE) + } text("starts Flink scala shell connecting to a remote cluster\n") children( +arg[String]("") action { (h, c) => + c.copy(host = h) } + text("remote host name as string"), +arg[Int]("") action { (p, c) => + c.copy(port = p) } + text("remote port as integer\n"), +opt[(String)] ("addclasspath") abbr("a") valueName("") action { --- End diff -- Duplicated space between `abbr("a")` and `valueName` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r41503496 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -18,48 +18,74 @@ package org.apache.flink.api.scala +import java.io.{StringWriter, BufferedReader} + +import org.apache.flink.api.common.ExecutionMode + import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster import scala.tools.nsc.Settings +import scala.tools.nsc.interpreter._ + object FlinkShell { + object ExecutionMode extends Enumeration { +val UNDEFINED, LOCAL, REMOTE = Value + } + + var bufferedReader: Option[BufferedReader] = None + def main(args: Array[String]) { // scopt, command line arguments case class Config( port: Int = -1, host: String = "none", -externalJars: Option[Array[String]] = None) +externalJars: Option[Array[String]] = None, +flinkShellExecutionMode : ExecutionMode.Value = ExecutionMode.UNDEFINED) + val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { head ("Flink Scala Shell") - opt[Int] ('p', "port") action { -(x, c) => - c.copy (port = x) - } text("port specifies port of running JobManager") - - opt[(String)] ('h',"host") action { -case (x, c) => - c.copy (host = x) - } text("host specifies host name of running JobManager") - - opt[(String)] ('a',"addclasspath") action { -case (x,c) => - val xArray = x.split(":") - c.copy(externalJars = Option(xArray)) - } text("specifies additional jars to be used in Flink") - - help("help") text("prints this usage text") + cmd("local") action { +(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode = ExecutionMode.LOCAL) --- End diff -- Please insert space after comma `,` and remove space after `(`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r41504295 --- Diff: flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + +import java.io._ + +import org.junit.runner.RunWith +import org.scalatest.{Matchers, FunSuite} +import org.scalatest.junit.JUnitRunner + + +@RunWith(classOf[JUnitRunner]) +class ScalaShellLocalStartupITCase extends FunSuite with Matchers { + +/** + * tests flink shell with local setup through startup script in bin folder + */ +test("start flink scala shell with local cluster") { + + val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + "els.print\nError\n:q\n" + val in: BufferedReader = new BufferedReader(new StringReader(input + "\n")) + val out: StringWriter = new StringWriter + val baos: ByteArrayOutputStream = new ByteArrayOutputStream + val oldOut: PrintStream = System.out + System.setOut(new PrintStream(baos)) + val args: Array[String] = Array("local") + + //start flink scala shell + FlinkShell.bufferedReader = Some(in); + FlinkShell.main(args) + + --- End diff -- Unnecessary new line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r41504243 --- Diff: docs/apis/scala_shell.md --- @@ -30,15 +30,16 @@ Flink and setting up a cluster please refer to To use the shell with an integrated Flink cluster just execute: ~~~bash -bin/start-scala-shell.sh +bin/start-scala-shell.sh local ~~~ in the root directory of your binary Flink directory. -To use it with a running cluster you can supply the host and port of the JobManager with: +To use it with a running cluster start the scala shell with the keyword remote --- End diff -- Ah I had a typo in line notes, grave accent(\`) would be better than quotation marks ("). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-146560991 @nikste Sorry for bothering you, but there are still some style issues. Looks good to merge except the issues. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r41518056 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -67,39 +93,62 @@ object FlinkShell { def startShell( - userHost : String, - userPort : Int, - externalJars : Option[Array[String]] = None): Unit ={ + userHost: String, + userPort: Int, + executionMode: ExecutionMode.Value, + externalJars: Option[Array[String]] = None): Unit ={ println("Starting Flink Shell:") -var cluster: LocalFlinkMiniCluster = null - // either port or userhost not specified by user, create new minicluster -val (host,port) = if (userHost == "none" || userPort == -1 ) { - println("Creating new local server") - cluster = new LocalFlinkMiniCluster(new Configuration, false) - cluster.start() - ("localhost",cluster.getLeaderRPCPort) -} else { - println(s"Connecting to remote server (host: $userHost, port: $userPort).") - (userHost, userPort) -} - -// custom shell -val repl = new FlinkILoop(host, port, externalJars) //new MyILoop(); - -repl.settings = new Settings() - -repl.settings.usejavacp.value = true - -// start scala interpreter shell -repl.process(repl.settings) - -//repl.closeInterpreter() - -if (cluster != null) { - cluster.stop() +val (host: String, port: Int, cluster: Option[LocalFlinkMiniCluster]) = + executionMode match { +case ExecutionMode.LOCAL => + val miniCluster = new LocalFlinkMiniCluster(new Configuration, false) + miniCluster.start() + val port = miniCluster.getLeaderRPCPort + println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n") + ("localhost",port, Some(miniCluster)) --- End diff -- Need a space after `,`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r41518604 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala --- @@ -34,16 +34,16 @@ class FlinkILoop( out0: JPrintWriter) extends ILoopCompat(in0, out0) { - def this(host:String, - port:Int, - externalJars : Option[Array[String]], + def this(host: String, + port: Int, + externalJars: Option[Array[String]], in0: BufferedReader, out: JPrintWriter){ -this(host:String, port:Int, externalJars, Some(in0), out) +this(host: String, port: Int, externalJars, Some(in0), out) } - def this(host:String, port:Int, externalJars : Option[Array[String]]){ -this(host:String,port: Int, externalJars , None, new JPrintWriter(Console.out, true)) + def this(host: String, port: Int, externalJars: Option[Array[String]]){ +this(host: String, port: Int, externalJars , None, new JPrintWriter(Console.out, true)) --- End diff -- Please remove space after `externalJars` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r41519871 --- Diff: flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala --- @@ -225,8 +223,50 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { out.toString + stdout } - var cluster: Option[ForkableFlinkMiniCluster] = None - val parallelism = 4 + /** + * tests flink shell startup with remote cluster (starts cluster internally) + */ + test("start flink scala shell with remote cluster") { + +val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + + "els.print\nError\n:q\n" + +val in: BufferedReader = new BufferedReader( + new StringReader( +input + "\n")) +val out: StringWriter = new StringWriter + +val baos: ByteArrayOutputStream = new ByteArrayOutputStream +val oldOut: PrintStream = System.out +System.setOut(new PrintStream(baos)) + +val (c, args) = cluster match{ + case Some(cl) => +val arg = Array("remote", + cl.hostname, + Integer.toString(cl.getLeaderRPCPort)) +(cl, arg) + case None => +fail("Cluster creation failed!") +} + +//start scala shell with initialized +// buffered reader for testing +FlinkShell.bufferedReader = Some(in) +FlinkShell.main(args) +baos.flush --- End diff -- If the return type of method or function is `Unit`, adding parenthesis after name is recommended. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r41519895 --- Diff: flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + +import java.io._ + +import org.junit.runner.RunWith +import org.scalatest.{Matchers, FunSuite} +import org.scalatest.junit.JUnitRunner + + +@RunWith(classOf[JUnitRunner]) +class ScalaShellLocalStartupITCase extends FunSuite with Matchers { + +/** + * tests flink shell with local setup through startup script in bin folder + */ +test("start flink scala shell with local cluster") { + + val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + "els.print\nError\n:q\n" + val in: BufferedReader = new BufferedReader(new StringReader(input + "\n")) + val out: StringWriter = new StringWriter + val baos: ByteArrayOutputStream = new ByteArrayOutputStream + val oldOut: PrintStream = System.out + System.setOut(new PrintStream(baos)) + val args: Array[String] = Array("local") + + //start flink scala shell + FlinkShell.bufferedReader = Some(in); + FlinkShell.main(args) + + baos.flush --- End diff -- Please add parenthesis after `flush`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-146533038 Okay, if there is no objection and the travis passes, I'll merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r41518461 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -18,48 +18,74 @@ package org.apache.flink.api.scala +import java.io.{StringWriter, BufferedReader} + +import org.apache.flink.api.common.ExecutionMode + import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster import scala.tools.nsc.Settings +import scala.tools.nsc.interpreter._ + object FlinkShell { + object ExecutionMode extends Enumeration { +val UNDEFINED, LOCAL, REMOTE = Value + } + + var bufferedReader: Option[BufferedReader] = None + def main(args: Array[String]) { // scopt, command line arguments case class Config( port: Int = -1, host: String = "none", -externalJars: Option[Array[String]] = None) +externalJars: Option[Array[String]] = None, +flinkShellExecutionMode: ExecutionMode.Value = ExecutionMode.UNDEFINED) + val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { head ("Flink Scala Shell") - opt[Int] ('p', "port") action { -(x, c) => - c.copy (port = x) - } text("port specifies port of running JobManager") - - opt[(String)] ('h',"host") action { -case (x, c) => - c.copy (host = x) - } text("host specifies host name of running JobManager") - - opt[(String)] ('a',"addclasspath") action { -case (x,c) => - val xArray = x.split(":") - c.copy(externalJars = Option(xArray)) - } text("specifies additional jars to be used in Flink") - - help("help") text("prints this usage text") + cmd("local") action { +(_, c) => c.copy(host = "none", port = -1, flinkShellExecutionMode = ExecutionMode.LOCAL) + } text("starts Flink scala shell with a local Flink cluster\n") children( +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x, c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink\n") +) + + cmd("remote") action { (_, c) => +c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE) + } text("starts Flink scala shell connecting to a remote cluster\n") children( +arg[String]("") action { (h, c) => + c.copy(host = h) } + text("remote host name as string"), +arg[Int]("") action { (p, c) => + c.copy(port = p) } + text("remote port as integer\n"), +opt[(String)]("addclasspath") abbr("a") valueName("") action { + case (x, c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink") + --- End diff -- Unnecessary new line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r41518435 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -18,48 +18,74 @@ package org.apache.flink.api.scala +import java.io.{StringWriter, BufferedReader} + +import org.apache.flink.api.common.ExecutionMode + import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster import scala.tools.nsc.Settings +import scala.tools.nsc.interpreter._ + object FlinkShell { + object ExecutionMode extends Enumeration { +val UNDEFINED, LOCAL, REMOTE = Value + } + + var bufferedReader: Option[BufferedReader] = None + def main(args: Array[String]) { // scopt, command line arguments case class Config( port: Int = -1, host: String = "none", -externalJars: Option[Array[String]] = None) +externalJars: Option[Array[String]] = None, +flinkShellExecutionMode: ExecutionMode.Value = ExecutionMode.UNDEFINED) + val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { head ("Flink Scala Shell") - opt[Int] ('p', "port") action { -(x, c) => - c.copy (port = x) - } text("port specifies port of running JobManager") - - opt[(String)] ('h',"host") action { -case (x, c) => - c.copy (host = x) - } text("host specifies host name of running JobManager") - - opt[(String)] ('a',"addclasspath") action { -case (x,c) => - val xArray = x.split(":") - c.copy(externalJars = Option(xArray)) - } text("specifies additional jars to be used in Flink") - - help("help") text("prints this usage text") + cmd("local") action { +(_, c) => c.copy(host = "none", port = -1, flinkShellExecutionMode = ExecutionMode.LOCAL) + } text("starts Flink scala shell with a local Flink cluster\n") children( +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x, c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink\n") +) + + cmd("remote") action { (_, c) => +c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE) + } text("starts Flink scala shell connecting to a remote cluster\n") children( +arg[String]("") action { (h, c) => + c.copy(host = h) } + text("remote host name as string"), +arg[Int]("") action { (p, c) => + c.copy(port = p) } + text("remote port as integer\n"), +opt[(String)]("addclasspath") abbr("a") valueName("") action { + case (x, c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink") + + ) + help("help") abbr("h") text("prints this usage text\n") } - // parse arguments -parser.parse (args, Config () ) match { +parser.parse (args, Config ()) match { --- End diff -- Please remove space after `Config` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user nikste commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-146573493 @chiwanpark issues should be fixed NOW ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user nikste commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r41525031 --- Diff: flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala --- @@ -225,8 +223,50 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { out.toString + stdout } - var cluster: Option[ForkableFlinkMiniCluster] = None - val parallelism = 4 + /** + * tests flink shell startup with remote cluster (starts cluster internally) + */ + test("start flink scala shell with remote cluster") { + +val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + + "els.print\nError\n:q\n" + +val in: BufferedReader = new BufferedReader( + new StringReader( +input + "\n")) +val out: StringWriter = new StringWriter + +val baos: ByteArrayOutputStream = new ByteArrayOutputStream +val oldOut: PrintStream = System.out +System.setOut(new PrintStream(baos)) + +val (c, args) = cluster match{ + case Some(cl) => +val arg = Array("remote", + cl.hostname, + Integer.toString(cl.getLeaderRPCPort)) +(cl, arg) + case None => +fail("Cluster creation failed!") +} + +//start scala shell with initialized +// buffered reader for testing +FlinkShell.bufferedReader = Some(in) +FlinkShell.main(args) +baos.flush --- End diff -- Actually the official reasoning is if it has some "side-effects" it should have some parenthesis (which it does in this case) http://docs.scala-lang.org/style/method-invocation.html --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user nikste commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-146568015 @chiwanpark issues should be fixed now.. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r41523437 --- Diff: flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + +import java.io._ + +import org.junit.runner.RunWith +import org.scalatest.{Matchers, FunSuite} +import org.scalatest.junit.JUnitRunner + + +@RunWith(classOf[JUnitRunner]) +class ScalaShellLocalStartupITCase extends FunSuite with Matchers { + +/** + * tests flink shell with local setup through startup script in bin folder + */ +test("start flink scala shell with local cluster") { + + val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + "els.print\nError\n:q\n" + val in: BufferedReader = new BufferedReader(new StringReader(input + "\n")) + val out: StringWriter = new StringWriter + val baos: ByteArrayOutputStream = new ByteArrayOutputStream + val oldOut: PrintStream = System.out + System.setOut(new PrintStream(baos)) + val args: Array[String] = Array("local") + + //start flink scala shell + FlinkShell.bufferedReader = Some(in); + FlinkShell.main(args) + + baos.flush --- End diff -- There is still unaddressed line. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r41525735 --- Diff: flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala --- @@ -225,8 +223,50 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { out.toString + stdout } - var cluster: Option[ForkableFlinkMiniCluster] = None - val parallelism = 4 + /** + * tests flink shell startup with remote cluster (starts cluster internally) + */ + test("start flink scala shell with remote cluster") { + +val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + + "els.print\nError\n:q\n" + +val in: BufferedReader = new BufferedReader( + new StringReader( +input + "\n")) +val out: StringWriter = new StringWriter + +val baos: ByteArrayOutputStream = new ByteArrayOutputStream +val oldOut: PrintStream = System.out +System.setOut(new PrintStream(baos)) + +val (c, args) = cluster match{ + case Some(cl) => +val arg = Array("remote", + cl.hostname, + Integer.toString(cl.getLeaderRPCPort)) +(cl, arg) + case None => +fail("Cluster creation failed!") +} + +//start scala shell with initialized +// buffered reader for testing +FlinkShell.bufferedReader = Some(in) +FlinkShell.main(args) +baos.flush --- End diff -- Yeah, right. And almost all the method of which return type is `Unit` have some side-effects. :) > On Oct 8, 2015, at 5:05 PM, Nikolaas Steenbergenwrote: > > In flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala: > > > + > > +val (c, args) = cluster match{ > > + case Some(cl) => > > +val arg = Array("remote", > > + cl.hostname, > > + Integer.toString(cl.getLeaderRPCPort)) > > +(cl, arg) > > + case None => > > +fail("Cluster creation failed!") > > +} > > + > > +//start scala shell with initialized > > +// buffered reader for testing > > +FlinkShell.bufferedReader = Some(in) > > +FlinkShell.main(args) > > +baos.flush > > Actually the official reasoning is if it has some "side-effects" it should have some parenthesis (which it does in this case) > http://docs.scala-lang.org/style/method-invocation.html > > — > Reply to this email directly or view it on GitHub. > Regards, Chiwan Park --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-146267428 Hi @nikste, because #1197 is merged first into master, we need rebase this on master. Could you rebase this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user nikste commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r40898989 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + + +import java.io.{StringWriter, BufferedReader} + +import scala.tools.nsc.Settings + +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster + +import scala.tools.nsc.interpreter._ + + +object FlinkShell { + + val LOCAL = 0; + val REMOTE = 1; + val UNDEFINED = -1; + + var bufferedReader: BufferedReader = null; + + def main(args: Array[String]) { + +// scopt, command line arguments +case class Config( +port: Int = -1, +host: String = "none", +externalJars: Option[Array[String]] = None, +flinkShellExecutionMode : Int = UNDEFINED) + +val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { + head ("Flink Scala Shell") + + cmd("local") action { +(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode = LOCAL) + } text("starts Flink scala shell with a local Flink cluster\n") children( +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink\n") +) + + cmd("remote") action { (_, c) => +c.copy(flinkShellExecutionMode = REMOTE) + } text("starts Flink scala shell connecting to a remote cluster\n") children( +arg[String]("") action { (h, c) => + c.copy(host = h) } + text("remote host name as string"), +arg[Int]("") action { (p, c) => + c.copy(port = p) } + text("remote port as integer\n"), +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink") + + ) + help("help") abbr("h") text("prints this usage text\n") +} + + +// parse arguments +parser.parse (args, Config () ) match { + case Some(config) => +startShell(config.host, + config.port, + config.flinkShellExecutionMode, + config.externalJars) + + case _ => println("Could not parse program arguments") +} + } + + + def startShell( + userHost : String, + userPort : Int, + executionMode : Int, + externalJars : Option[Array[String]] = None): Unit ={ + +println("Starting Flink Shell:") + +var cluster: LocalFlinkMiniCluster = null + +// either port or userhost not specified by user, create new minicluster +val (host,port) = if (executionMode == LOCAL) { + cluster = new LocalFlinkMiniCluster(new Configuration, false) --- End diff -- I was not aware of that, I would suggest, it should start up with slots for all cores available. What do you think? And how would this be done easiest? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user nikste commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-144688685 @tillrohrmann, I changed the code according to your comments. There should be no null values anymore now ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r4088 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + + +import java.io.{StringWriter, BufferedReader} + +import scala.tools.nsc.Settings + +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster + +import scala.tools.nsc.interpreter._ + + +object FlinkShell { + + val LOCAL = 0; + val REMOTE = 1; + val UNDEFINED = -1; + + var bufferedReader: BufferedReader = null; + + def main(args: Array[String]) { + +// scopt, command line arguments +case class Config( +port: Int = -1, +host: String = "none", +externalJars: Option[Array[String]] = None, +flinkShellExecutionMode : Int = UNDEFINED) + +val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { + head ("Flink Scala Shell") + + cmd("local") action { +(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode = LOCAL) + } text("starts Flink scala shell with a local Flink cluster\n") children( +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink\n") +) + + cmd("remote") action { (_, c) => +c.copy(flinkShellExecutionMode = REMOTE) + } text("starts Flink scala shell connecting to a remote cluster\n") children( +arg[String]("") action { (h, c) => + c.copy(host = h) } + text("remote host name as string"), +arg[Int]("") action { (p, c) => + c.copy(port = p) } + text("remote port as integer\n"), +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink") + + ) + help("help") abbr("h") text("prints this usage text\n") +} + + +// parse arguments +parser.parse (args, Config () ) match { + case Some(config) => +startShell(config.host, + config.port, + config.flinkShellExecutionMode, + config.externalJars) + + case _ => println("Could not parse program arguments") +} + } + + + def startShell( + userHost : String, + userPort : Int, + executionMode : Int, + externalJars : Option[Array[String]] = None): Unit ={ + +println("Starting Flink Shell:") + +var cluster: LocalFlinkMiniCluster = null + +// either port or userhost not specified by user, create new minicluster +val (host,port) = if (executionMode == LOCAL) { + cluster = new LocalFlinkMiniCluster(new Configuration, false) + cluster.start() + val port = cluster.getLeaderRPCPort + println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n") + ("localhost",port) +} else if(executionMode == UNDEFINED) { + println("Error: please specify execution mode:") + println("[local | remote ]") + return +} else if(userHost == "none" || userPort == -1){ + println("Error: or not specified!") + return +} else { + println(s"\nConnecting to Flink cluster (host: $userHost, port: $userPort).\n") + (userHost, userPort) +} --- End diff -- it would be more
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r40890167 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + + +import java.io.{StringWriter, BufferedReader} + +import scala.tools.nsc.Settings + +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster + +import scala.tools.nsc.interpreter._ + + +object FlinkShell { + + val LOCAL = 0; + val REMOTE = 1; + val UNDEFINED = -1; + + var bufferedReader: BufferedReader = null; + + def main(args: Array[String]) { + +// scopt, command line arguments +case class Config( +port: Int = -1, +host: String = "none", +externalJars: Option[Array[String]] = None, +flinkShellExecutionMode : Int = UNDEFINED) + +val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { + head ("Flink Scala Shell") + + cmd("local") action { +(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode = LOCAL) + } text("starts Flink scala shell with a local Flink cluster\n") children( +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink\n") +) + + cmd("remote") action { (_, c) => +c.copy(flinkShellExecutionMode = REMOTE) + } text("starts Flink scala shell connecting to a remote cluster\n") children( +arg[String]("") action { (h, c) => + c.copy(host = h) } + text("remote host name as string"), +arg[Int]("") action { (p, c) => + c.copy(port = p) } + text("remote port as integer\n"), +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink") + + ) + help("help") abbr("h") text("prints this usage text\n") +} + + +// parse arguments +parser.parse (args, Config () ) match { + case Some(config) => +startShell(config.host, + config.port, + config.flinkShellExecutionMode, + config.externalJars) + + case _ => println("Could not parse program arguments") +} + } + + + def startShell( + userHost : String, + userPort : Int, + executionMode : Int, + externalJars : Option[Array[String]] = None): Unit ={ + +println("Starting Flink Shell:") + +var cluster: LocalFlinkMiniCluster = null + +// either port or userhost not specified by user, create new minicluster +val (host,port) = if (executionMode == LOCAL) { + cluster = new LocalFlinkMiniCluster(new Configuration, false) + cluster.start() + val port = cluster.getLeaderRPCPort + println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n") + ("localhost",port) +} else if(executionMode == UNDEFINED) { + println("Error: please specify execution mode:") + println("[local | remote ]") + return +} else if(userHost == "none" || userPort == -1){ + println("Error: or not specified!") + return +} else { + println(s"\nConnecting to Flink cluster (host: $userHost, port: $userPort).\n") + (userHost, userPort) +} + + +// custom shell +var
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r40890119 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + + +import java.io.{StringWriter, BufferedReader} + +import scala.tools.nsc.Settings + +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster + +import scala.tools.nsc.interpreter._ + + +object FlinkShell { + + val LOCAL = 0; + val REMOTE = 1; + val UNDEFINED = -1; + + var bufferedReader: BufferedReader = null; + + def main(args: Array[String]) { + +// scopt, command line arguments +case class Config( +port: Int = -1, +host: String = "none", +externalJars: Option[Array[String]] = None, +flinkShellExecutionMode : Int = UNDEFINED) + +val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { + head ("Flink Scala Shell") + + cmd("local") action { +(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode = LOCAL) + } text("starts Flink scala shell with a local Flink cluster\n") children( +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink\n") +) + + cmd("remote") action { (_, c) => +c.copy(flinkShellExecutionMode = REMOTE) + } text("starts Flink scala shell connecting to a remote cluster\n") children( +arg[String]("") action { (h, c) => + c.copy(host = h) } + text("remote host name as string"), +arg[Int]("") action { (p, c) => + c.copy(port = p) } + text("remote port as integer\n"), +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink") + + ) + help("help") abbr("h") text("prints this usage text\n") +} + + +// parse arguments +parser.parse (args, Config () ) match { + case Some(config) => +startShell(config.host, + config.port, + config.flinkShellExecutionMode, + config.externalJars) + + case _ => println("Could not parse program arguments") +} + } + + + def startShell( + userHost : String, + userPort : Int, + executionMode : Int, + externalJars : Option[Array[String]] = None): Unit ={ + +println("Starting Flink Shell:") + +var cluster: LocalFlinkMiniCluster = null + +// either port or userhost not specified by user, create new minicluster +val (host,port) = if (executionMode == LOCAL) { + cluster = new LocalFlinkMiniCluster(new Configuration, false) + cluster.start() + val port = cluster.getLeaderRPCPort + println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n") + ("localhost",port) +} else if(executionMode == UNDEFINED) { + println("Error: please specify execution mode:") + println("[local | remote ]") + return +} else if(userHost == "none" || userPort == -1){ + println("Error: or not specified!") + return +} else { + println(s"\nConnecting to Flink cluster (host: $userHost, port: $userPort).\n") + (userHost, userPort) +} + + +// custom shell +var
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r40890492 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + + +import java.io.{StringWriter, BufferedReader} + +import scala.tools.nsc.Settings + +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster + +import scala.tools.nsc.interpreter._ + + +object FlinkShell { + + val LOCAL = 0; + val REMOTE = 1; + val UNDEFINED = -1; + + var bufferedReader: BufferedReader = null; + + def main(args: Array[String]) { + +// scopt, command line arguments +case class Config( +port: Int = -1, +host: String = "none", +externalJars: Option[Array[String]] = None, +flinkShellExecutionMode : Int = UNDEFINED) + +val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { + head ("Flink Scala Shell") + + cmd("local") action { +(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode = LOCAL) + } text("starts Flink scala shell with a local Flink cluster\n") children( +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink\n") +) + + cmd("remote") action { (_, c) => +c.copy(flinkShellExecutionMode = REMOTE) + } text("starts Flink scala shell connecting to a remote cluster\n") children( +arg[String]("") action { (h, c) => + c.copy(host = h) } + text("remote host name as string"), +arg[Int]("") action { (p, c) => + c.copy(port = p) } + text("remote port as integer\n"), +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink") + + ) + help("help") abbr("h") text("prints this usage text\n") +} + + +// parse arguments +parser.parse (args, Config () ) match { + case Some(config) => +startShell(config.host, + config.port, + config.flinkShellExecutionMode, + config.externalJars) + + case _ => println("Could not parse program arguments") +} + } + + + def startShell( + userHost : String, + userPort : Int, + executionMode : Int, + externalJars : Option[Array[String]] = None): Unit ={ + +println("Starting Flink Shell:") + +var cluster: LocalFlinkMiniCluster = null + +// either port or userhost not specified by user, create new minicluster +val (host,port) = if (executionMode == LOCAL) { + cluster = new LocalFlinkMiniCluster(new Configuration, false) + cluster.start() + val port = cluster.getLeaderRPCPort + println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n") + ("localhost",port) +} else if(executionMode == UNDEFINED) { + println("Error: please specify execution mode:") + println("[local | remote ]") + return +} else if(userHost == "none" || userPort == -1){ + println("Error: or not specified!") + return +} else { + println(s"\nConnecting to Flink cluster (host: $userHost, port: $userPort).\n") + (userHost, userPort) +} + + +// custom shell +var
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r40892847 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + + +import java.io.{StringWriter, BufferedReader} + +import scala.tools.nsc.Settings + +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster + +import scala.tools.nsc.interpreter._ + + +object FlinkShell { + + val LOCAL = 0; + val REMOTE = 1; + val UNDEFINED = -1; + + var bufferedReader: BufferedReader = null; + + def main(args: Array[String]) { + +// scopt, command line arguments +case class Config( +port: Int = -1, +host: String = "none", +externalJars: Option[Array[String]] = None, +flinkShellExecutionMode : Int = UNDEFINED) + +val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { + head ("Flink Scala Shell") + + cmd("local") action { +(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode = LOCAL) + } text("starts Flink scala shell with a local Flink cluster\n") children( +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink\n") +) + + cmd("remote") action { (_, c) => +c.copy(flinkShellExecutionMode = REMOTE) + } text("starts Flink scala shell connecting to a remote cluster\n") children( +arg[String]("") action { (h, c) => + c.copy(host = h) } + text("remote host name as string"), +arg[Int]("") action { (p, c) => + c.copy(port = p) } + text("remote port as integer\n"), +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink") + + ) + help("help") abbr("h") text("prints this usage text\n") +} + + +// parse arguments +parser.parse (args, Config () ) match { + case Some(config) => +startShell(config.host, + config.port, + config.flinkShellExecutionMode, + config.externalJars) + + case _ => println("Could not parse program arguments") +} + } + + + def startShell( + userHost : String, + userPort : Int, + executionMode : Int, + externalJars : Option[Array[String]] = None): Unit ={ + +println("Starting Flink Shell:") + +var cluster: LocalFlinkMiniCluster = null + +// either port or userhost not specified by user, create new minicluster +val (host,port) = if (executionMode == LOCAL) { + cluster = new LocalFlinkMiniCluster(new Configuration, false) + cluster.start() + val port = cluster.getLeaderRPCPort + println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n") + ("localhost",port) +} else if(executionMode == UNDEFINED) { + println("Error: please specify execution mode:") + println("[local | remote ]") + return --- End diff -- Return statements are breaking the control flow. This is not a good style. Would be great to get rid of them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r40892778 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + + +import java.io.{StringWriter, BufferedReader} + +import scala.tools.nsc.Settings + +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster + +import scala.tools.nsc.interpreter._ + + +object FlinkShell { + + val LOCAL = 0; + val REMOTE = 1; + val UNDEFINED = -1; + + var bufferedReader: BufferedReader = null; + + def main(args: Array[String]) { + +// scopt, command line arguments +case class Config( +port: Int = -1, +host: String = "none", +externalJars: Option[Array[String]] = None, +flinkShellExecutionMode : Int = UNDEFINED) + +val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { + head ("Flink Scala Shell") + + cmd("local") action { +(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode = LOCAL) + } text("starts Flink scala shell with a local Flink cluster\n") children( +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink\n") +) + + cmd("remote") action { (_, c) => +c.copy(flinkShellExecutionMode = REMOTE) + } text("starts Flink scala shell connecting to a remote cluster\n") children( +arg[String]("") action { (h, c) => + c.copy(host = h) } + text("remote host name as string"), +arg[Int]("") action { (p, c) => + c.copy(port = p) } + text("remote port as integer\n"), +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink") + + ) + help("help") abbr("h") text("prints this usage text\n") +} + + +// parse arguments +parser.parse (args, Config () ) match { + case Some(config) => +startShell(config.host, + config.port, + config.flinkShellExecutionMode, + config.externalJars) + + case _ => println("Could not parse program arguments") +} + } + + + def startShell( + userHost : String, + userPort : Int, + executionMode : Int, + externalJars : Option[Array[String]] = None): Unit ={ + +println("Starting Flink Shell:") + +var cluster: LocalFlinkMiniCluster = null + +// either port or userhost not specified by user, create new minicluster +val (host,port) = if (executionMode == LOCAL) { + cluster = new LocalFlinkMiniCluster(new Configuration, false) + cluster.start() + val port = cluster.getLeaderRPCPort + println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n") + ("localhost",port) +} else if(executionMode == UNDEFINED) { + println("Error: please specify execution mode:") + println("[local | remote ]") + return +} else if(userHost == "none" || userPort == -1){ + println("Error: or not specified!") + return +} else { + println(s"\nConnecting to Flink cluster (host: $userHost, port: $userPort).\n") + (userHost, userPort) +} + + +// custom shell +var
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r40892740 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + + +import java.io.{StringWriter, BufferedReader} + +import scala.tools.nsc.Settings + +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster + +import scala.tools.nsc.interpreter._ + + +object FlinkShell { + + val LOCAL = 0; + val REMOTE = 1; + val UNDEFINED = -1; + + var bufferedReader: BufferedReader = null; + + def main(args: Array[String]) { + +// scopt, command line arguments +case class Config( +port: Int = -1, +host: String = "none", +externalJars: Option[Array[String]] = None, +flinkShellExecutionMode : Int = UNDEFINED) + +val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { + head ("Flink Scala Shell") + + cmd("local") action { +(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode = LOCAL) + } text("starts Flink scala shell with a local Flink cluster\n") children( +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink\n") +) + + cmd("remote") action { (_, c) => +c.copy(flinkShellExecutionMode = REMOTE) + } text("starts Flink scala shell connecting to a remote cluster\n") children( +arg[String]("") action { (h, c) => + c.copy(host = h) } + text("remote host name as string"), +arg[Int]("") action { (p, c) => + c.copy(port = p) } + text("remote port as integer\n"), +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink") + + ) + help("help") abbr("h") text("prints this usage text\n") +} + + +// parse arguments +parser.parse (args, Config () ) match { + case Some(config) => +startShell(config.host, + config.port, + config.flinkShellExecutionMode, + config.externalJars) + + case _ => println("Could not parse program arguments") +} + } + + + def startShell( + userHost : String, + userPort : Int, + executionMode : Int, + externalJars : Option[Array[String]] = None): Unit ={ + +println("Starting Flink Shell:") + +var cluster: LocalFlinkMiniCluster = null --- End diff -- Can we make `cluster` a `val` of type `Option[LocalFlinkMiniCluster]` whose value is the value of the `if-else` expression? That way we avoid `null` values which are evil. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r40892899 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + + +import java.io.{StringWriter, BufferedReader} + +import scala.tools.nsc.Settings + +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster + +import scala.tools.nsc.interpreter._ + + +object FlinkShell { + + val LOCAL = 0; + val REMOTE = 1; + val UNDEFINED = -1; + + var bufferedReader: BufferedReader = null; + + def main(args: Array[String]) { + +// scopt, command line arguments +case class Config( +port: Int = -1, +host: String = "none", +externalJars: Option[Array[String]] = None, +flinkShellExecutionMode : Int = UNDEFINED) + +val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { + head ("Flink Scala Shell") + + cmd("local") action { +(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode = LOCAL) + } text("starts Flink scala shell with a local Flink cluster\n") children( +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink\n") +) + + cmd("remote") action { (_, c) => +c.copy(flinkShellExecutionMode = REMOTE) + } text("starts Flink scala shell connecting to a remote cluster\n") children( +arg[String]("") action { (h, c) => + c.copy(host = h) } + text("remote host name as string"), +arg[Int]("") action { (p, c) => + c.copy(port = p) } + text("remote port as integer\n"), +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink") + + ) + help("help") abbr("h") text("prints this usage text\n") +} + + +// parse arguments +parser.parse (args, Config () ) match { + case Some(config) => +startShell(config.host, + config.port, + config.flinkShellExecutionMode, + config.externalJars) + + case _ => println("Could not parse program arguments") +} + } + + + def startShell( + userHost : String, + userPort : Int, + executionMode : Int, + externalJars : Option[Array[String]] = None): Unit ={ + +println("Starting Flink Shell:") + +var cluster: LocalFlinkMiniCluster = null + +// either port or userhost not specified by user, create new minicluster +val (host,port) = if (executionMode == LOCAL) { + cluster = new LocalFlinkMiniCluster(new Configuration, false) + cluster.start() + val port = cluster.getLeaderRPCPort + println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n") + ("localhost",port) +} else if(executionMode == UNDEFINED) { + println("Error: please specify execution mode:") + println("[local | remote ]") + return +} else if(userHost == "none" || userPort == -1){ + println("Error: or not specified!") + return +} else { + println(s"\nConnecting to Flink cluster (host: $userHost, port: $userPort).\n") + (userHost, userPort) +} + + +// custom shell +var
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r40893325 --- Diff: flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala --- @@ -225,6 +220,51 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { out.toString + stdout } + /** + * tests flink shell startup with remote cluster (starts cluster internally) + */ + test("start flink scala shell with remote cluster") { + +val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + + "els.print\nError\n:q\n" + +val in: BufferedReader = new BufferedReader( + new StringReader( +input + "\n")) +val out: StringWriter = new StringWriter + +val baos: ByteArrayOutputStream = new ByteArrayOutputStream +val oldOut: PrintStream = System.out +System.setOut(new PrintStream(baos)) +val c = cluster.getOrElse(null) --- End diff -- `null` values are not very scalaesque. Especially since you already have a `Option[ForkableFlinkMiniCluster]`. Why not simply applying a `map` operation on it or extracting it via pattern matching? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-144667856 I had some more comments mostly concerning Scala style and one concerning the setup of the `LocalFlinkMiniCluster` which will currently only be started with a single TM and a single slot. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r40889842 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + + +import java.io.{StringWriter, BufferedReader} + +import scala.tools.nsc.Settings + +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster + +import scala.tools.nsc.interpreter._ + + +object FlinkShell { + + val LOCAL = 0; + val REMOTE = 1; + val UNDEFINED = -1; --- End diff -- Can we use an enum for that? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r40890027 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + + +import java.io.{StringWriter, BufferedReader} + +import scala.tools.nsc.Settings + +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster + +import scala.tools.nsc.interpreter._ + + +object FlinkShell { + + val LOCAL = 0; + val REMOTE = 1; + val UNDEFINED = -1; + + var bufferedReader: BufferedReader = null; + + def main(args: Array[String]) { + +// scopt, command line arguments +case class Config( +port: Int = -1, +host: String = "none", +externalJars: Option[Array[String]] = None, +flinkShellExecutionMode : Int = UNDEFINED) + +val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { + head ("Flink Scala Shell") + + cmd("local") action { +(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode = LOCAL) + } text("starts Flink scala shell with a local Flink cluster\n") children( +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink\n") +) + + cmd("remote") action { (_, c) => +c.copy(flinkShellExecutionMode = REMOTE) + } text("starts Flink scala shell connecting to a remote cluster\n") children( +arg[String]("") action { (h, c) => + c.copy(host = h) } + text("remote host name as string"), +arg[Int]("") action { (p, c) => + c.copy(port = p) } + text("remote port as integer\n"), +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink") + + ) + help("help") abbr("h") text("prints this usage text\n") +} + + +// parse arguments +parser.parse (args, Config () ) match { + case Some(config) => +startShell(config.host, + config.port, + config.flinkShellExecutionMode, + config.externalJars) + + case _ => println("Could not parse program arguments") +} + } + + + def startShell( + userHost : String, + userPort : Int, + executionMode : Int, + externalJars : Option[Array[String]] = None): Unit ={ + +println("Starting Flink Shell:") + +var cluster: LocalFlinkMiniCluster = null + +// either port or userhost not specified by user, create new minicluster +val (host,port) = if (executionMode == LOCAL) { + cluster = new LocalFlinkMiniCluster(new Configuration, false) + cluster.start() + val port = cluster.getLeaderRPCPort + println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n") + ("localhost",port) +} else if(executionMode == UNDEFINED) { + println("Error: please specify execution mode:") + println("[local | remote ]") + return +} else if(userHost == "none" || userPort == -1){ + println("Error: or not specified!") + return +} else { + println(s"\nConnecting to Flink cluster (host: $userHost, port: $userPort).\n") + (userHost, userPort) +} + + --- End diff -- two line
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r40890515 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + + +import java.io.{StringWriter, BufferedReader} + +import scala.tools.nsc.Settings + +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster + +import scala.tools.nsc.interpreter._ + + +object FlinkShell { + + val LOCAL = 0; + val REMOTE = 1; + val UNDEFINED = -1; + + var bufferedReader: BufferedReader = null; + + def main(args: Array[String]) { + +// scopt, command line arguments +case class Config( +port: Int = -1, +host: String = "none", +externalJars: Option[Array[String]] = None, +flinkShellExecutionMode : Int = UNDEFINED) + +val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { + head ("Flink Scala Shell") + + cmd("local") action { +(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode = LOCAL) + } text("starts Flink scala shell with a local Flink cluster\n") children( +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink\n") +) + + cmd("remote") action { (_, c) => +c.copy(flinkShellExecutionMode = REMOTE) + } text("starts Flink scala shell connecting to a remote cluster\n") children( +arg[String]("") action { (h, c) => + c.copy(host = h) } + text("remote host name as string"), +arg[Int]("") action { (p, c) => + c.copy(port = p) } + text("remote port as integer\n"), +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink") + + ) + help("help") abbr("h") text("prints this usage text\n") +} + + +// parse arguments +parser.parse (args, Config () ) match { + case Some(config) => +startShell(config.host, + config.port, + config.flinkShellExecutionMode, + config.externalJars) + + case _ => println("Could not parse program arguments") +} + } + + + def startShell( + userHost : String, + userPort : Int, + executionMode : Int, + externalJars : Option[Array[String]] = None): Unit ={ + +println("Starting Flink Shell:") + +var cluster: LocalFlinkMiniCluster = null + +// either port or userhost not specified by user, create new minicluster +val (host,port) = if (executionMode == LOCAL) { + cluster = new LocalFlinkMiniCluster(new Configuration, false) + cluster.start() + val port = cluster.getLeaderRPCPort + println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n") + ("localhost",port) +} else if(executionMode == UNDEFINED) { + println("Error: please specify execution mode:") + println("[local | remote ]") + return +} else if(userHost == "none" || userPort == -1){ + println("Error: or not specified!") + return +} else { + println(s"\nConnecting to Flink cluster (host: $userHost, port: $userPort).\n") + (userHost, userPort) +} + + +// custom shell +var
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r40893139 --- Diff: flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala --- @@ -225,6 +220,51 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { out.toString + stdout } + /** + * tests flink shell startup with remote cluster (starts cluster internally) + */ + test("start flink scala shell with remote cluster") { + +val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + + "els.print\nError\n:q\n" + +val in: BufferedReader = new BufferedReader( + new StringReader( +input + "\n")) +val out: StringWriter = new StringWriter + +val baos: ByteArrayOutputStream = new ByteArrayOutputStream +val oldOut: PrintStream = System.out +System.setOut(new PrintStream(baos)) +val c = cluster.getOrElse(null) +var args : Array[String] = null +if (c != null) { + args = Array("remote", +c.hostname, +Integer.toString(c.getLeaderRPCPort)) +} else { + fail("Cluster creation failed!") +} + +//start scala shell with initialized +// buffered reader for testing +FlinkShell.bufferedReader = in +FlinkShell.main(args) +baos.flush + +val output: String = baos.toString +System.setOut(oldOut) + +output should include("Job execution switched to status FINISHED.") +output should include("a\nb") + +output should not include "Error" +output should not include "ERROR" +output should not include "Exception" +output should not include "failed" + } + + var cluster: Option[ForkableFlinkMiniCluster] = None val parallelism = 4 --- End diff -- Can we move the class field to the top of the class definition? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r40893417 --- Diff: flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala --- @@ -225,6 +220,51 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { out.toString + stdout } + /** + * tests flink shell startup with remote cluster (starts cluster internally) + */ + test("start flink scala shell with remote cluster") { + +val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + + "els.print\nError\n:q\n" + +val in: BufferedReader = new BufferedReader( + new StringReader( +input + "\n")) +val out: StringWriter = new StringWriter + +val baos: ByteArrayOutputStream = new ByteArrayOutputStream +val oldOut: PrintStream = System.out +System.setOut(new PrintStream(baos)) +val c = cluster.getOrElse(null) +var args : Array[String] = null +if (c != null) { --- End diff -- Here again. `null` values and `var` fields can be easily avoided in Scala by simply executing the statements after the else branch within the if branch. This will also lead to a much cleaner control flow. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r40893947 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + + +import java.io.{StringWriter, BufferedReader} + +import scala.tools.nsc.Settings + +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster + +import scala.tools.nsc.interpreter._ + + +object FlinkShell { + + val LOCAL = 0; + val REMOTE = 1; + val UNDEFINED = -1; + + var bufferedReader: BufferedReader = null; + + def main(args: Array[String]) { + +// scopt, command line arguments +case class Config( +port: Int = -1, +host: String = "none", +externalJars: Option[Array[String]] = None, +flinkShellExecutionMode : Int = UNDEFINED) + +val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { + head ("Flink Scala Shell") + + cmd("local") action { +(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode = LOCAL) + } text("starts Flink scala shell with a local Flink cluster\n") children( +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink\n") +) + + cmd("remote") action { (_, c) => +c.copy(flinkShellExecutionMode = REMOTE) + } text("starts Flink scala shell connecting to a remote cluster\n") children( +arg[String]("") action { (h, c) => + c.copy(host = h) } + text("remote host name as string"), +arg[Int]("") action { (p, c) => + c.copy(port = p) } + text("remote port as integer\n"), +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink") + + ) + help("help") abbr("h") text("prints this usage text\n") +} + + +// parse arguments +parser.parse (args, Config () ) match { + case Some(config) => +startShell(config.host, + config.port, + config.flinkShellExecutionMode, + config.externalJars) + + case _ => println("Could not parse program arguments") +} + } + + + def startShell( + userHost : String, + userPort : Int, + executionMode : Int, + externalJars : Option[Array[String]] = None): Unit ={ + +println("Starting Flink Shell:") + +var cluster: LocalFlinkMiniCluster = null + +// either port or userhost not specified by user, create new minicluster +val (host,port) = if (executionMode == LOCAL) { + cluster = new LocalFlinkMiniCluster(new Configuration, false) --- End diff -- This will start a singel TM with a single slot. Do we want that? Or do we want to start a single TM with multiple slots? E.g. number of cores. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user nikste commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-144345845 @chiwanpark can you check again ? It seems that if I squash both commits the history isn't shown anymore. If they are separated ~~~bash git log --follow ~~~ shows the history of both files. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
GitHub user nikste reopened a pull request: https://github.com/apache/flink/pull/1106 [FLINK-2613] Print usage information for Scala Shell changed startup of scala shell You can merge this pull request into a Git repository by running: $ git pull https://github.com/nikste/flink FLINK-2613-enhanced-help-message Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1106.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1106 commit 7e20299c4e2d9cc78c36f90bdf0acdbaf72062b0 Author: Stephan EwenDate: 2015-09-23T10:05:54Z [FLINK-2753] [streaming] [api breaking] Add first parts of new window API for key grouped windows This follows the API design outlined in https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams This is API breaking because it adds new generic type parameters to Java and Scala classes, breaking binary compatibility. commit a606c4a9e83e54397c88a70ea80a00e32ca61a93 Author: Aljoscha Krettek Date: 2015-09-24T14:30:29Z Fix some typos and naming inconsistencies in new Windowing Code commit 05d2138f081ff5fa274dab571b9327f96be693aa Author: Aljoscha Krettek Date: 2015-09-24T14:33:09Z Move window operators and tests to windowing package The api package is also called windowing, this harmonizes the package names. commit 86c45bfa2c760ca99741fa866f30514c7986 Author: Aljoscha Krettek Date: 2015-09-24T14:40:46Z Harmonize generic parameter names in Stream API classes commit 6610caeca4093fced35d478ffc596f935912b4c1 Author: Aljoscha Krettek Date: 2015-09-24T15:39:19Z Add Window parameter to KeyedWindowFunction, move to package windowing commit bfaad37c0aa8ac43a282a8c9cb2e0d64fdf22f72 Author: Aljoscha Krettek Date: 2015-09-24T15:42:15Z Add Count and Delta WindowPolicy commit 3be2dc1aaaeae4cbbfaecaf4998a64f1199260eb Author: Aljoscha Krettek Date: 2015-09-25T09:34:10Z Move delta window functions to package functions.windowing.delta commit dd51c97741b336d3a11e319183537eef864a84fd Author: Aljoscha Krettek Date: 2015-09-25T10:27:35Z [FLINK-2677] Add a general-purpose keyed-window operator commit 0f17755766f2c4ada37bb196981c7dc01016f73c Author: Maximilian Michels Date: 2015-09-28T18:07:18Z [hotfix][streaming] correct return type of execute(..) method commit 0b4dc067fee82654fae3292bf6bf7d59157bf5c0 Author: vasia Date: 2015-09-24T20:08:10Z [FLINK-2561] [gelly] add missing methods to Graph: add-remove edges/vertices, difference, graph creation methods, validate, getTriplets. Add missing utility mappers. commit 9e0284efa90561c88b1bc829b800d89e84477caa Author: vasia Date: 2015-09-24T21:31:38Z [FLINK-2561] [gelly] convert existing tests to use collect instead of files; add tests for newly added operations. Add completeness test: fromCsvReader method is missing. commit 233dab497577f0a9443f772bb10390f6dcc005f1 Author: vasia Date: 2015-09-25T09:20:15Z [FLINK-2561] [gelly] add GraphMetrics Scala example This closes #1183 commit 8d2289ea6981598ab4d2817192b3f171aa9d414d Author: Rerngvit Yanggratoke Date: 2015-09-28T18:34:46Z [FLINK-2768] [docs] Fix wrong java version requirement in quickstart (1.6 -> 1.7 and 6.x -> 7.x) This closes #1188 commit 3b8b4f0f8c0600dc851d676ce1bd7f5ab81cb64f Author: Ufuk Celebi Date: 2015-09-26T12:12:20Z [FLINK-2769] [dashboard] Remove hard coded job server URL This closes #1185 commit 40cbf7e4b038c945916fa8843b60bd4a59a4ae31 Author: Fabian Hueske Date: 2015-09-25T09:47:27Z [FLINK-2756] [scripts] Fix start/stop scripts for paths with spaces This closes #1182 commit e727355e42bd0ad7d403aee703aaf33a68a839d2 Author: Greg Hogan Date: 2015-09-21T19:14:09Z [FLINK-2723] [core] CopyableValue method to copy into new instance This closes #1169 commit 68912126d73b92a07d15ec3f21f9ac922744fb45 Author: chengxiang li Date: 2015-09-24T03:20:10Z [FLINK-2754] Fixed FixedLengthRecordSorter write to multi memory pages issue and add more unit tests. This closes #1178 commit 0a8df6d513fa59d650ff875bdf3a1613d0f14af5 Author: Greg Hogan Date: 2015-09-10T13:35:39Z [FLINK-2653] [runtime] Enable object reuse in MergeIterator This closes #1115 commit 16afb8ec66a2a07733b9090bffe96af1e913bb63 Author: Sachin Goel Date:
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user nikste commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-144337406 @chiwanpark I moved the files with git mv, however they still don't show the history. This seems to be more of a gui thing of github according to: http://stackoverflow.com/questions/31304441/i-moved-all-my-project-files-with-git-mv-but-i-have-lost-all-my-history-why However, the history still exists and can be accessed via: ~~~bash git log --follow ~~~ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user nikste commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-144339801 @sachingoel0101 just did and I've merged the two commits --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-144347584 @chiwanpark this should get fixed after your #1197 gets in first and this is rebased on that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-144338050 Hey @nikste , you should rebase this to the current master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-144341545 But It seems that there is still same problem. ![screen shot 2015-09-30 at 11 43 59 am](https://cloud.githubusercontent.com/assets/1941681/10189758/acf85620-6768-11e5-8f91-72d7e6fde483.png) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-144347730 Okay, I checked that the logs are shown. The problem is resolved. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r40801479 --- Diff: flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala --- @@ -225,6 +220,52 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { out.toString + stdout } + /** + * tests flink shell startup with remote cluster (starts cluster internally) + */ + test("start flink scala shell with remote cluster") { + +val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + + "els.print\nError\n:q\n" + +val in: BufferedReader = new BufferedReader( + new StringReader( +input + "\n")) +val out: StringWriter = new StringWriter + +val baos: ByteArrayOutputStream = new ByteArrayOutputStream +val oldOut: PrintStream = System.out +System.setOut(new PrintStream(baos)) +val c = cluster.getOrElse(null); +var args : Array[String] = null; --- End diff -- Unnecessary semicolon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r40801674 --- Diff: flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala --- @@ -225,6 +220,52 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { out.toString + stdout } + /** + * tests flink shell startup with remote cluster (starts cluster internally) + */ + test("start flink scala shell with remote cluster") { + +val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + + "els.print\nError\n:q\n" + +val in: BufferedReader = new BufferedReader( + new StringReader( +input + "\n")) +val out: StringWriter = new StringWriter + +val baos: ByteArrayOutputStream = new ByteArrayOutputStream +val oldOut: PrintStream = System.out +System.setOut(new PrintStream(baos)) +val c = cluster.getOrElse(null); +var args : Array[String] = null; +if(c != null){ + args = Array("remote", +c.hostname, +Integer.toString(c.getLeaderRPCPort)) +} +else{ + assert(false) +} + +//start scala shell with initialized +// buffered reader for testing +FlinkShell.bufferedReader = in; --- End diff -- Unnecessary semicolon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r40801356 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + + +import java.io.{StringWriter, BufferedReader} + +import scala.tools.nsc.Settings + +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster + +import scala.tools.nsc.interpreter._ + + +object FlinkShell { + + val LOCAL = 0; + val REMOTE = 1; + val UNDEFINED = -1; + + var bufferedReader: BufferedReader = null; + + def main(args: Array[String]) { + +// scopt, command line arguments +case class Config( +port: Int = -1, +host: String = "none", +externalJars: Option[Array[String]] = None, +flinkShellExecutionMode : Int = UNDEFINED) + +val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { + head ("Flink Scala Shell") + + cmd("local") action { +(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode = LOCAL) + } text("starts Flink scala shell with a local Flink cluster\n") children( +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink\n") +) + + cmd("remote") action { (_, c) => +c.copy(flinkShellExecutionMode = REMOTE) + } text("starts Flink scala shell connecting to a remote cluster\n") children( +arg[String]("") action { (h, c) => + c.copy(host = h) } + text("remote host name as string"), +arg[Int]("") action { (p, c) => + c.copy(port = p) } + text("remote port as integer\n"), +opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x,c) => +val xArray = x.split(":") +c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink") + + ) + help("help") abbr("h") text("prints this usage text\n") +} + + +// parse arguments +parser.parse (args, Config () ) match { + case Some(config) => +startShell(config.host, + config.port, + config.flinkShellExecutionMode, + config.externalJars) + + case _ => println("Could not parse program arguments") +} + } + + + def startShell( + userHost : String, + userPort : Int, + executionMode : Int, + externalJars : Option[Array[String]] = None): Unit ={ + +println("Starting Flink Shell:") + +var cluster: LocalFlinkMiniCluster = null + +// either port or userhost not specified by user, create new minicluster +val (host,port) = if (executionMode == LOCAL) { + cluster = new LocalFlinkMiniCluster(new Configuration, false) + cluster.start() + val port = cluster.getLeaderRPCPort + println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n") + ("localhost",port) +} else if(executionMode == UNDEFINED) { + println("Error: please specify execution mode:") + println("[local | remote ]") + return +} else if(userHost == "none" || userPort == -1){ + println("Error: or not specified!") + return +} else { + println(s"\nConnecting to Flink cluster (host: $userHost, port: $userPort).\n") + (userHost, userPort) +} + + +// custom shell +var
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r40801467 --- Diff: flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala --- @@ -212,6 +214,52 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { out.toString + stdout } + /** + * tests flink shell startup with remote cluster (starts cluster internally) + */ + test("start flink scala shell with remote cluster") { +val miniCluster: ForkableFlinkMiniCluster = + new ForkableFlinkMiniCluster(new Configuration, false) + + +val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + + "els.print\nError\n:q\n" + +val in: BufferedReader = new BufferedReader( + new StringReader( +input + "\n")) +val out: StringWriter = new StringWriter + +val baos: ByteArrayOutputStream = new ByteArrayOutputStream +val oldOut: PrintStream = System.out +System.setOut(new PrintStream(baos)) +val c = cluster.getOrElse(null); --- End diff -- Unnecessary semicolon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r40802535 --- Diff: flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala --- @@ -225,6 +220,52 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { out.toString + stdout } + /** + * tests flink shell startup with remote cluster (starts cluster internally) + */ + test("start flink scala shell with remote cluster") { + +val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + + "els.print\nError\n:q\n" + +val in: BufferedReader = new BufferedReader( + new StringReader( +input + "\n")) +val out: StringWriter = new StringWriter + +val baos: ByteArrayOutputStream = new ByteArrayOutputStream +val oldOut: PrintStream = System.out +System.setOut(new PrintStream(baos)) +val c = cluster.getOrElse(null); +var args : Array[String] = null; +if(c != null){ + args = Array("remote", +c.hostname, +Integer.toString(c.getLeaderRPCPort)) +} +else{ + assert(false) +} + +//start scala shell with initialized +// buffered reader for testing +FlinkShell.bufferedReader = in; +FlinkShell.main(args) +baos.flush + +val output: String = baos.toString +System.setOut(oldOut) + +assert(output.contains("Job execution switched to status FINISHED.")) +assert(output.contains("a\nb")) + +assert((!output.contains("Error"))) +assert((!output.contains("ERROR"))) +assert((!output.contains("Exception"))) +assert((!output.contains("failed"))) --- End diff -- And scalatest matcher syntax would be better. For example: ```scala output should not include "Error" ``` You can see statements like this at line 162-164 of `ScalaShellITSuite.scala`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r40801655 --- Diff: flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala --- @@ -225,6 +220,52 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { out.toString + stdout } + /** + * tests flink shell startup with remote cluster (starts cluster internally) + */ + test("start flink scala shell with remote cluster") { + +val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + + "els.print\nError\n:q\n" + +val in: BufferedReader = new BufferedReader( + new StringReader( +input + "\n")) +val out: StringWriter = new StringWriter + +val baos: ByteArrayOutputStream = new ByteArrayOutputStream +val oldOut: PrintStream = System.out +System.setOut(new PrintStream(baos)) +val c = cluster.getOrElse(null); +var args : Array[String] = null; +if(c != null){ + args = Array("remote", +c.hostname, +Integer.toString(c.getLeaderRPCPort)) +} +else{ + assert(false) +} --- End diff -- Could you re-format this `if-else` block? Please add some space between keyword such as `if` or `else` and remove unnecessary newlines. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r40802725 --- Diff: flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala --- @@ -225,6 +220,52 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { out.toString + stdout } + /** + * tests flink shell startup with remote cluster (starts cluster internally) + */ + test("start flink scala shell with remote cluster") { + +val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + + "els.print\nError\n:q\n" + +val in: BufferedReader = new BufferedReader( + new StringReader( +input + "\n")) +val out: StringWriter = new StringWriter + +val baos: ByteArrayOutputStream = new ByteArrayOutputStream +val oldOut: PrintStream = System.out +System.setOut(new PrintStream(baos)) +val c = cluster.getOrElse(null); +var args : Array[String] = null; +if(c != null){ + args = Array("remote", +c.hostname, +Integer.toString(c.getLeaderRPCPort)) +} +else{ + assert(false) --- End diff -- Please use `fail` method instead of `assert(false)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r40801825 --- Diff: flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala --- @@ -225,6 +220,52 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { out.toString + stdout } + /** + * tests flink shell startup with remote cluster (starts cluster internally) + */ + test("start flink scala shell with remote cluster") { + +val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + + "els.print\nError\n:q\n" + +val in: BufferedReader = new BufferedReader( + new StringReader( +input + "\n")) +val out: StringWriter = new StringWriter + +val baos: ByteArrayOutputStream = new ByteArrayOutputStream +val oldOut: PrintStream = System.out +System.setOut(new PrintStream(baos)) +val c = cluster.getOrElse(null); +var args : Array[String] = null; +if(c != null){ + args = Array("remote", +c.hostname, +Integer.toString(c.getLeaderRPCPort)) +} +else{ + assert(false) +} + +//start scala shell with initialized +// buffered reader for testing +FlinkShell.bufferedReader = in; +FlinkShell.main(args) +baos.flush + +val output: String = baos.toString +System.setOut(oldOut) + +assert(output.contains("Job execution switched to status FINISHED.")) +assert(output.contains("a\nb")) + +assert((!output.contains("Error"))) +assert((!output.contains("ERROR"))) +assert((!output.contains("Exception"))) +assert((!output.contains("failed"))) --- End diff -- These assert statements contain unnecessary parentheses. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-144432852 I just reviewed this PR and have a question. What is difference between `ScalaShellLocalStartupITCase` and `ScalaShellITSuite`? Does we need another integration test? BTW, there are some style issues. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r40811385 --- Diff: flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala --- @@ -225,6 +220,51 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { out.toString + stdout } + /** + * tests flink shell startup with remote cluster (starts cluster internally) + */ + test("start flink scala shell with remote cluster") { + +val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + + "els.print\nError\n:q\n" + +val in: BufferedReader = new BufferedReader( + new StringReader( +input + "\n")) +val out: StringWriter = new StringWriter + +val baos: ByteArrayOutputStream = new ByteArrayOutputStream +val oldOut: PrintStream = System.out +System.setOut(new PrintStream(baos)) +val c = cluster.getOrElse(null) +var args : Array[String] = null +if (c != null) { + args = Array("remote", +c.hostname, +Integer.toString(c.getLeaderRPCPort)) +} else { + fail --- End diff -- Oh, I forgot the usage of `fail` method. `fail(error message)` would be better. The error message could be the reason of failure such as "Creating cluster failed.". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user nikste commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-144456895 @chiwanpark changed fail to give some reasons. This is nicer indeed :+1: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user nikste commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-17780 @chiwanpark, fixed the style issues. `ScalaShellLocalStartupITCase` starts the shell in local mode (i.e. it will create its own local cluster ) being invoked by the "command line" arguments `ScalaShellITSuite` creates a cluster before the scala shell is invoked, thus acting as a "remote" cluster, here general features are tested as well as the invocation with "command line" arguments connecting to the "remote" cluster. As asked by @StephanEwen above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user nikste closed the pull request at: https://github.com/apache/flink/pull/1106 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user nikste commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-141910091 @fhueske, good point to check if the program was actually executed! @rmetzger fixed the directory structure as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-141401714 @nikste, since you need to change the pull request again, can you also fix the directory name of the classes `flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user nikste commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-141066672 Should be fixed according to your comments @fhueske --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r39763329 --- Diff: flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala --- @@ -212,6 +212,49 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { out.toString + stdout } + /** + * tests flink shell startup with remote cluster (starts cluster internally) + */ + test("start flink scala shell with remote cluster") { + +val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + --- End diff -- Can you also add a check, that the program was actually executed for example by checking that the correct output was produced? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r39763373 --- Diff: flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + +import java.io._ + +import org.junit.runner.RunWith +import org.scalatest.{Matchers, FunSuite} +import org.scalatest.junit.JUnitRunner + + +@RunWith(classOf[JUnitRunner]) +class ScalaShellLocalStartupITCase extends FunSuite with Matchers { + +/** + * tests flink shell with local setup through startup script in bin folder + */ +test("start flink scala shell with local cluster") { + + val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + "els.print\nError\n:q\n" --- End diff -- Please check that the program was actually executed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-141131210 Thanks for the update @nikste! I have only one minor thing to add. After that it's good to merge, IMO. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r39654142 --- Diff: flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala --- @@ -212,6 +214,52 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { out.toString + stdout } + /** + * tests flink shell startup with remote cluster (starts cluster internally) + */ + test("start flink scala shell with remote cluster") { +val miniCluster: ForkableFlinkMiniCluster = --- End diff -- You are creating a new ForkableFlinkMiniCluster here but not using it later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r39654250 --- Diff: flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala --- @@ -212,6 +214,52 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { out.toString + stdout } + /** + * tests flink shell startup with remote cluster (starts cluster internally) + */ + test("start flink scala shell with remote cluster") { +val miniCluster: ForkableFlinkMiniCluster = + new ForkableFlinkMiniCluster(new Configuration, false) + + +val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + + "els.print\nError\n:q\n" + +val in: BufferedReader = new BufferedReader( + new StringReader( +input + "\n")) +val out: StringWriter = new StringWriter + +val baos: ByteArrayOutputStream = new ByteArrayOutputStream +val oldOut: PrintStream = System.out +System.setOut(new PrintStream(baos)) +val c = cluster.getOrElse(null); --- End diff -- You are not connecting to the miniCluster you started before. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r39653423 --- Diff: flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupIT.scala --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + +import java.io._ + +import org.junit.runner.RunWith +import org.scalatest.{Matchers, FunSuite} +import org.scalatest.junit.JUnitRunner + + +/** + * Created by nikste on 9/16/15. + */ +@RunWith(classOf[JUnitRunner]) +class ScalaShellLocalStartupIT extends FunSuite with Matchers { --- End diff -- We have the convention that integration test classes end with `ITCase` or `ITSuite`. Can you rename this class? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1106#discussion_r39652554 --- Diff: flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupIT.scala --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + +import java.io._ + +import org.junit.runner.RunWith +import org.scalatest.{Matchers, FunSuite} +import org.scalatest.junit.JUnitRunner + + +/** + * Created by nikste on 9/16/15. --- End diff -- Please remove this comment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-140239647 The tests use the shell scripts (`.sh` or `.bat`) to start the Scala Shell and rely on finding these scripts in the `flink-dist/target` folder. I think this is a bit fragile. Why not directly using the `FlinkShell.scala` class? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1106#issuecomment-138863295 Looks good. Is it possible that we cover both cases with tests? 1. Start the shell in local mode, submit some commands. 2. Start a `ForkableFlinkMiniCluster`, and start the shell in mode `remote "localhost" cluster.getJobManagerRpcPort()`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...
GitHub user nikste opened a pull request: https://github.com/apache/flink/pull/1106 [FLINK-2613] Print usage information for Scala Shell changed startup of scala shell You can merge this pull request into a Git repository by running: $ git pull https://github.com/nikste/flink FLINK-2613-enhanced-help-message Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1106.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1106 commit 8aa7e99fed606b3731fbd7b2ef7f14f4d72801fe Author: Nikolaas SteenbergenDate: 2015-09-07T09:36:59Z [FLINK-2613] Print usage information for Scala Shell, changed startup of scala shell --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---