[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-09 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-146891099
  
Merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1106


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r41503795
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -68,38 +94,62 @@ object FlinkShell {
 
   def startShell(
   userHost : String, 
-  userPort : Int, 
+  userPort : Int,
+  executionMode : ExecutionMode.Value,
   externalJars : Option[Array[String]] = None): Unit ={
 
 println("Starting Flink Shell:")
 
-var cluster: LocalFlinkMiniCluster = null
-
 // either port or userhost not specified by user, create new 
minicluster
-val (host,port) = if (userHost == "none" || userPort == -1 ) {
-  println("Creating new local server")
-  cluster = new LocalFlinkMiniCluster(new Configuration, false)
-  cluster.start()
-  ("localhost",cluster.getLeaderRPCPort)
-} else {
-  println(s"Connecting to remote server (host: $userHost, port: 
$userPort).")
-  (userHost, userPort)
+val (host : String, port : Int, cluster :  
Option[LocalFlinkMiniCluster]) =
+
+executionMode match {
+  case ExecutionMode.LOCAL =>
+val miniCluster = new LocalFlinkMiniCluster(new Configuration, 
false)
+miniCluster.start()
+val port = miniCluster.getLeaderRPCPort
+println(s"\nStarting local Flink cluster (host: localhost, port: 
$port).\n")
+("localhost",port, Some(miniCluster))
+
+  case ExecutionMode.REMOTE =>
+if (userHost == "none" || userPort == -1) {
+  println("Error:  or  not specified!")
+  return
+} else {
+  println(s"\nConnecting to Flink cluster (host: $userHost, port: 
$userPort).\n")
+  (userHost, userPort, None)
+}
+
+  case ExecutionMode.UNDEFINED =>
+println("Error: please specify execution mode:")
+println("[local | remote  ]")
+return
 }
--- End diff --

I think that this block should be indented more because the result of this 
block is assigned to (host, port, cluster).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r41503815
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -68,38 +94,62 @@ object FlinkShell {
 
   def startShell(
   userHost : String, 
-  userPort : Int, 
+  userPort : Int,
+  executionMode : ExecutionMode.Value,
   externalJars : Option[Array[String]] = None): Unit ={
 
 println("Starting Flink Shell:")
 
-var cluster: LocalFlinkMiniCluster = null
-
 // either port or userhost not specified by user, create new 
minicluster
-val (host,port) = if (userHost == "none" || userPort == -1 ) {
-  println("Creating new local server")
-  cluster = new LocalFlinkMiniCluster(new Configuration, false)
-  cluster.start()
-  ("localhost",cluster.getLeaderRPCPort)
-} else {
-  println(s"Connecting to remote server (host: $userHost, port: 
$userPort).")
-  (userHost, userPort)
+val (host : String, port : Int, cluster :  
Option[LocalFlinkMiniCluster]) =
+
+executionMode match {
+  case ExecutionMode.LOCAL =>
+val miniCluster = new LocalFlinkMiniCluster(new Configuration, 
false)
+miniCluster.start()
+val port = miniCluster.getLeaderRPCPort
+println(s"\nStarting local Flink cluster (host: localhost, port: 
$port).\n")
+("localhost",port, Some(miniCluster))
+
+  case ExecutionMode.REMOTE =>
+if (userHost == "none" || userPort == -1) {
+  println("Error:  or  not specified!")
+  return
+} else {
+  println(s"\nConnecting to Flink cluster (host: $userHost, port: 
$userPort).\n")
+  (userHost, userPort, None)
+}
+
+  case ExecutionMode.UNDEFINED =>
+println("Error: please specify execution mode:")
+println("[local | remote  ]")
+return
 }
-
-// custom shell
-val repl = new FlinkILoop(host, port, externalJars) //new MyILoop();
 
-repl.settings = new Settings()
+try {
+  // custom shell
+  val repl : FlinkILoop =
--- End diff --

Space after parameter name


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r41503634
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -18,48 +18,74 @@
 
 package org.apache.flink.api.scala
 
+import java.io.{StringWriter, BufferedReader}
+
+import org.apache.flink.api.common.ExecutionMode
+
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 
 import scala.tools.nsc.Settings
 
+import scala.tools.nsc.interpreter._
+
 
 object FlinkShell {
 
+  object ExecutionMode extends Enumeration {
+val UNDEFINED, LOCAL, REMOTE = Value
+  }
+
+  var bufferedReader: Option[BufferedReader] = None
+
   def main(args: Array[String]) {
 
 // scopt, command line arguments
 case class Config(
 port: Int = -1,
 host: String = "none",
-externalJars: Option[Array[String]] = None)
+externalJars: Option[Array[String]] = None,
+flinkShellExecutionMode : ExecutionMode.Value = 
ExecutionMode.UNDEFINED)
+
 val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
   head ("Flink Scala Shell")
 
-  opt[Int] ('p', "port") action {
-(x, c) =>
-  c.copy (port = x)
-  } text("port specifies port of running JobManager")
-
-  opt[(String)] ('h',"host") action {
-case (x, c) =>
-  c.copy (host = x)
-  }  text("host specifies host name of running JobManager")
-
-  opt[(String)] ('a',"addclasspath") action {
-case (x,c) =>
-  val xArray = x.split(":")
-  c.copy(externalJars = Option(xArray))
-  } text("specifies additional jars to be used in Flink")
-  
-  help("help") text("prints this usage text")
+  cmd("local") action {
+(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode 
= ExecutionMode.LOCAL)
+  } text("starts Flink scala shell with a local Flink cluster\n") 
children(
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink\n")
+)
+
+  cmd("remote") action { (_, c) =>
+c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
+  } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
+arg[String]("") action { (h, c) =>
+  c.copy(host = h) }
+  text("remote host name as string"),
+arg[Int]("") action { (p, c) =>
+  c.copy(port = p) }
+  text("remote port as integer\n"),
+opt[(String)] ("addclasspath") abbr("a")  
valueName("") action {
--- End diff --

Also, please remove space between `opt[(String)]` and `("addclasspath")`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r41504281
  
--- Diff: 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
 ---
@@ -225,8 +223,50 @@ class ScalaShellITSuite extends FunSuite with Matchers 
with BeforeAndAfterAll {
 out.toString + stdout
   }
 
-  var cluster: Option[ForkableFlinkMiniCluster] = None
-  val parallelism = 4
+  /**
+   * tests flink shell startup with remote cluster (starts cluster 
internally)
+   */
+  test("start flink scala shell with remote cluster") {
+
+val input: String = "val els = env.fromElements(\"a\",\"b\");\n" +
+  "els.print\nError\n:q\n"
+
+val in: BufferedReader = new BufferedReader(
+  new StringReader(
+input + "\n"))
+val out: StringWriter = new StringWriter
+
+val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+val oldOut: PrintStream = System.out
+System.setOut(new PrintStream(baos))
+
+val (c,args) = cluster match{
--- End diff --

Add space after comma


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-146519237
  
Hi @nikste, thanks for update. It looks good to merge except some style 
issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread nikste
Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-146520283
  
@chiwanpark, white line issues are now fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r41503746
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -68,38 +94,62 @@ object FlinkShell {
 
   def startShell(
   userHost : String, 
-  userPort : Int, 
+  userPort : Int,
+  executionMode : ExecutionMode.Value,
   externalJars : Option[Array[String]] = None): Unit ={
 
 println("Starting Flink Shell:")
 
-var cluster: LocalFlinkMiniCluster = null
-
 // either port or userhost not specified by user, create new 
minicluster
-val (host,port) = if (userHost == "none" || userPort == -1 ) {
-  println("Creating new local server")
-  cluster = new LocalFlinkMiniCluster(new Configuration, false)
-  cluster.start()
-  ("localhost",cluster.getLeaderRPCPort)
-} else {
-  println(s"Connecting to remote server (host: $userHost, port: 
$userPort).")
-  (userHost, userPort)
+val (host : String, port : Int, cluster :  
Option[LocalFlinkMiniCluster]) =
+
--- End diff --

Unnecessary newline.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r41503700
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -68,38 +94,62 @@ object FlinkShell {
 
   def startShell(
   userHost : String, 
-  userPort : Int, 
+  userPort : Int,
+  executionMode : ExecutionMode.Value,
   externalJars : Option[Array[String]] = None): Unit ={
--- End diff --

Please remove space after parameter name like following:
```scala
def startShell(
  userHost: String,
  userPort: Int,
  executionMode: ExecutionMode.value
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r41503724
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -68,38 +94,62 @@ object FlinkShell {
 
   def startShell(
   userHost : String, 
-  userPort : Int, 
+  userPort : Int,
+  executionMode : ExecutionMode.Value,
   externalJars : Option[Array[String]] = None): Unit ={
 
 println("Starting Flink Shell:")
 
-var cluster: LocalFlinkMiniCluster = null
-
 // either port or userhost not specified by user, create new 
minicluster
-val (host,port) = if (userHost == "none" || userPort == -1 ) {
-  println("Creating new local server")
-  cluster = new LocalFlinkMiniCluster(new Configuration, false)
-  cluster.start()
-  ("localhost",cluster.getLeaderRPCPort)
-} else {
-  println(s"Connecting to remote server (host: $userHost, port: 
$userPort).")
-  (userHost, userPort)
+val (host : String, port : Int, cluster :  
Option[LocalFlinkMiniCluster]) =
--- End diff --

Please remove space after parameter names.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r41504146
  
--- Diff: docs/apis/scala_shell.md ---
@@ -30,15 +30,16 @@ Flink and setting up a cluster please refer to
 To use the shell with an integrated Flink cluster just execute:
 
 ~~~bash
-bin/start-scala-shell.sh 
+bin/start-scala-shell.sh local
 ~~~
 
 in the root directory of your binary Flink directory.
 
-To use it with a running cluster you can supply the host and port of the 
JobManager with:
+To use it with a running cluster start the scala shell with the keyword 
remote
--- End diff --

Adding comma and quotation marks would be helpful to make readable like 
following:
```
To use it with a running cluster, start the scala shell with the keyword 
"remote"
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread nikste
Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-146507345
  
Hey @chiwanpark ! 
I've rebased this to the current master. Would be nice if this can be 
merged soon if there are no further comments on this PR!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r41503504
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -18,48 +18,74 @@
 
 package org.apache.flink.api.scala
 
+import java.io.{StringWriter, BufferedReader}
+
+import org.apache.flink.api.common.ExecutionMode
+
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 
 import scala.tools.nsc.Settings
 
+import scala.tools.nsc.interpreter._
+
 
 object FlinkShell {
 
+  object ExecutionMode extends Enumeration {
+val UNDEFINED, LOCAL, REMOTE = Value
+  }
+
+  var bufferedReader: Option[BufferedReader] = None
+
   def main(args: Array[String]) {
 
 // scopt, command line arguments
 case class Config(
 port: Int = -1,
 host: String = "none",
-externalJars: Option[Array[String]] = None)
+externalJars: Option[Array[String]] = None,
+flinkShellExecutionMode : ExecutionMode.Value = 
ExecutionMode.UNDEFINED)
+
 val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
   head ("Flink Scala Shell")
 
-  opt[Int] ('p', "port") action {
-(x, c) =>
-  c.copy (port = x)
-  } text("port specifies port of running JobManager")
-
-  opt[(String)] ('h',"host") action {
-case (x, c) =>
-  c.copy (host = x)
-  }  text("host specifies host name of running JobManager")
-
-  opt[(String)] ('a',"addclasspath") action {
-case (x,c) =>
-  val xArray = x.split(":")
-  c.copy(externalJars = Option(xArray))
-  } text("specifies additional jars to be used in Flink")
-  
-  help("help") text("prints this usage text")
+  cmd("local") action {
+(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode 
= ExecutionMode.LOCAL)
+  } text("starts Flink scala shell with a local Flink cluster\n") 
children(
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x,c) =>
--- End diff --

Add space after `,`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r41503540
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -18,48 +18,74 @@
 
 package org.apache.flink.api.scala
 
+import java.io.{StringWriter, BufferedReader}
+
+import org.apache.flink.api.common.ExecutionMode
+
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 
 import scala.tools.nsc.Settings
 
+import scala.tools.nsc.interpreter._
+
 
 object FlinkShell {
 
+  object ExecutionMode extends Enumeration {
+val UNDEFINED, LOCAL, REMOTE = Value
+  }
+
+  var bufferedReader: Option[BufferedReader] = None
+
   def main(args: Array[String]) {
 
 // scopt, command line arguments
 case class Config(
 port: Int = -1,
 host: String = "none",
-externalJars: Option[Array[String]] = None)
+externalJars: Option[Array[String]] = None,
+flinkShellExecutionMode : ExecutionMode.Value = 
ExecutionMode.UNDEFINED)
+
 val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
   head ("Flink Scala Shell")
 
-  opt[Int] ('p', "port") action {
-(x, c) =>
-  c.copy (port = x)
-  } text("port specifies port of running JobManager")
-
-  opt[(String)] ('h',"host") action {
-case (x, c) =>
-  c.copy (host = x)
-  }  text("host specifies host name of running JobManager")
-
-  opt[(String)] ('a',"addclasspath") action {
-case (x,c) =>
-  val xArray = x.split(":")
-  c.copy(externalJars = Option(xArray))
-  } text("specifies additional jars to be used in Flink")
-  
-  help("help") text("prints this usage text")
+  cmd("local") action {
+(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode 
= ExecutionMode.LOCAL)
+  } text("starts Flink scala shell with a local Flink cluster\n") 
children(
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink\n")
+)
+
+  cmd("remote") action { (_, c) =>
+c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
+  } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
+arg[String]("") action { (h, c) =>
+  c.copy(host = h) }
+  text("remote host name as string"),
+arg[Int]("") action { (p, c) =>
+  c.copy(port = p) }
+  text("remote port as integer\n"),
+opt[(String)] ("addclasspath") abbr("a")  
valueName("") action {
+  case (x,c) =>
--- End diff --

Add space after `,`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r41503568
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -18,48 +18,74 @@
 
 package org.apache.flink.api.scala
 
+import java.io.{StringWriter, BufferedReader}
+
+import org.apache.flink.api.common.ExecutionMode
+
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 
 import scala.tools.nsc.Settings
 
+import scala.tools.nsc.interpreter._
+
 
 object FlinkShell {
 
+  object ExecutionMode extends Enumeration {
+val UNDEFINED, LOCAL, REMOTE = Value
+  }
+
+  var bufferedReader: Option[BufferedReader] = None
+
   def main(args: Array[String]) {
 
 // scopt, command line arguments
 case class Config(
 port: Int = -1,
 host: String = "none",
-externalJars: Option[Array[String]] = None)
+externalJars: Option[Array[String]] = None,
+flinkShellExecutionMode : ExecutionMode.Value = 
ExecutionMode.UNDEFINED)
+
 val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
   head ("Flink Scala Shell")
 
-  opt[Int] ('p', "port") action {
-(x, c) =>
-  c.copy (port = x)
-  } text("port specifies port of running JobManager")
-
-  opt[(String)] ('h',"host") action {
-case (x, c) =>
-  c.copy (host = x)
-  }  text("host specifies host name of running JobManager")
-
-  opt[(String)] ('a',"addclasspath") action {
-case (x,c) =>
-  val xArray = x.split(":")
-  c.copy(externalJars = Option(xArray))
-  } text("specifies additional jars to be used in Flink")
-  
-  help("help") text("prints this usage text")
+  cmd("local") action {
+(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode 
= ExecutionMode.LOCAL)
+  } text("starts Flink scala shell with a local Flink cluster\n") 
children(
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink\n")
+)
+
+  cmd("remote") action { (_, c) =>
+c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
+  } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
+arg[String]("") action { (h, c) =>
+  c.copy(host = h) }
+  text("remote host name as string"),
+arg[Int]("") action { (p, c) =>
+  c.copy(port = p) }
+  text("remote port as integer\n"),
+opt[(String)] ("addclasspath") abbr("a")  
valueName("") action {
--- End diff --

Duplicated space between `abbr("a")` and `valueName`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r41503496
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -18,48 +18,74 @@
 
 package org.apache.flink.api.scala
 
+import java.io.{StringWriter, BufferedReader}
+
+import org.apache.flink.api.common.ExecutionMode
+
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 
 import scala.tools.nsc.Settings
 
+import scala.tools.nsc.interpreter._
+
 
 object FlinkShell {
 
+  object ExecutionMode extends Enumeration {
+val UNDEFINED, LOCAL, REMOTE = Value
+  }
+
+  var bufferedReader: Option[BufferedReader] = None
+
   def main(args: Array[String]) {
 
 // scopt, command line arguments
 case class Config(
 port: Int = -1,
 host: String = "none",
-externalJars: Option[Array[String]] = None)
+externalJars: Option[Array[String]] = None,
+flinkShellExecutionMode : ExecutionMode.Value = 
ExecutionMode.UNDEFINED)
+
 val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
   head ("Flink Scala Shell")
 
-  opt[Int] ('p', "port") action {
-(x, c) =>
-  c.copy (port = x)
-  } text("port specifies port of running JobManager")
-
-  opt[(String)] ('h',"host") action {
-case (x, c) =>
-  c.copy (host = x)
-  }  text("host specifies host name of running JobManager")
-
-  opt[(String)] ('a',"addclasspath") action {
-case (x,c) =>
-  val xArray = x.split(":")
-  c.copy(externalJars = Option(xArray))
-  } text("specifies additional jars to be used in Flink")
-  
-  help("help") text("prints this usage text")
+  cmd("local") action {
+(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode 
= ExecutionMode.LOCAL)
--- End diff --

Please insert space after comma `,` and remove space after `(`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r41504295
  
--- Diff: 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io._
+
+import org.junit.runner.RunWith
+import org.scalatest.{Matchers, FunSuite}
+import org.scalatest.junit.JUnitRunner
+
+
+@RunWith(classOf[JUnitRunner])
+class ScalaShellLocalStartupITCase extends FunSuite with Matchers {
+
+/**
+ * tests flink shell with local setup through startup script in bin 
folder
+ */
+test("start flink scala shell with local cluster") {
+
+  val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + 
"els.print\nError\n:q\n"
+  val in: BufferedReader = new BufferedReader(new StringReader(input + 
"\n"))
+  val out: StringWriter = new StringWriter
+  val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+  val oldOut: PrintStream = System.out
+  System.setOut(new PrintStream(baos))
+  val args: Array[String] = Array("local")
+
+  //start flink scala shell
+  FlinkShell.bufferedReader = Some(in);
+  FlinkShell.main(args)
+
+
--- End diff --

Unnecessary new line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r41504243
  
--- Diff: docs/apis/scala_shell.md ---
@@ -30,15 +30,16 @@ Flink and setting up a cluster please refer to
 To use the shell with an integrated Flink cluster just execute:
 
 ~~~bash
-bin/start-scala-shell.sh 
+bin/start-scala-shell.sh local
 ~~~
 
 in the root directory of your binary Flink directory.
 
-To use it with a running cluster you can supply the host and port of the 
JobManager with:
+To use it with a running cluster start the scala shell with the keyword 
remote
--- End diff --

Ah I had a typo in line notes, grave accent(\`) would be better than 
quotation marks (").


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-146560991
  
@nikste Sorry for bothering you, but there are still some style issues. 
Looks good to merge except the issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r41518056
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -67,39 +93,62 @@ object FlinkShell {
 
 
   def startShell(
-  userHost : String, 
-  userPort : Int, 
-  externalJars : Option[Array[String]] = None): Unit ={
+  userHost: String,
+  userPort: Int,
+  executionMode: ExecutionMode.Value,
+  externalJars: Option[Array[String]] = None): Unit ={
 
 println("Starting Flink Shell:")
 
-var cluster: LocalFlinkMiniCluster = null
-
 // either port or userhost not specified by user, create new 
minicluster
-val (host,port) = if (userHost == "none" || userPort == -1 ) {
-  println("Creating new local server")
-  cluster = new LocalFlinkMiniCluster(new Configuration, false)
-  cluster.start()
-  ("localhost",cluster.getLeaderRPCPort)
-} else {
-  println(s"Connecting to remote server (host: $userHost, port: 
$userPort).")
-  (userHost, userPort)
-}
-
-// custom shell
-val repl = new FlinkILoop(host, port, externalJars) //new MyILoop();
-
-repl.settings = new Settings()
-
-repl.settings.usejavacp.value = true
-
-// start scala interpreter shell
-repl.process(repl.settings)
-
-//repl.closeInterpreter()
-
-if (cluster != null) {
-  cluster.stop()
+val (host: String, port: Int, cluster: Option[LocalFlinkMiniCluster]) =
+  executionMode match {
+case ExecutionMode.LOCAL =>
+  val miniCluster = new LocalFlinkMiniCluster(new Configuration, 
false)
+  miniCluster.start()
+  val port = miniCluster.getLeaderRPCPort
+  println(s"\nStarting local Flink cluster (host: localhost, port: 
$port).\n")
+  ("localhost",port, Some(miniCluster))
--- End diff --

Need a space after `,`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r41518604
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
 ---
@@ -34,16 +34,16 @@ class FlinkILoop(
 out0: JPrintWriter)
   extends ILoopCompat(in0, out0) {
 
-  def this(host:String, 
-   port:Int, 
-   externalJars : Option[Array[String]], 
+  def this(host: String,
+   port: Int,
+   externalJars: Option[Array[String]],
in0: BufferedReader, 
out: JPrintWriter){
-this(host:String, port:Int, externalJars, Some(in0), out)
+this(host: String, port: Int, externalJars, Some(in0), out)
   }
 
-  def this(host:String, port:Int, externalJars : Option[Array[String]]){
-this(host:String,port: Int, externalJars , None, new 
JPrintWriter(Console.out, true))
+  def this(host: String, port: Int, externalJars: Option[Array[String]]){
+this(host: String, port: Int, externalJars , None, new 
JPrintWriter(Console.out, true))
--- End diff --

Please remove space after `externalJars`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r41519871
  
--- Diff: 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
 ---
@@ -225,8 +223,50 @@ class ScalaShellITSuite extends FunSuite with Matchers 
with BeforeAndAfterAll {
 out.toString + stdout
   }
 
-  var cluster: Option[ForkableFlinkMiniCluster] = None
-  val parallelism = 4
+  /**
+   * tests flink shell startup with remote cluster (starts cluster 
internally)
+   */
+  test("start flink scala shell with remote cluster") {
+
+val input: String = "val els = env.fromElements(\"a\",\"b\");\n" +
+  "els.print\nError\n:q\n"
+
+val in: BufferedReader = new BufferedReader(
+  new StringReader(
+input + "\n"))
+val out: StringWriter = new StringWriter
+
+val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+val oldOut: PrintStream = System.out
+System.setOut(new PrintStream(baos))
+
+val (c, args) = cluster match{
+  case Some(cl) =>
+val arg = Array("remote",
+  cl.hostname,
+  Integer.toString(cl.getLeaderRPCPort))
+(cl, arg)
+  case None =>
+fail("Cluster creation failed!")
+}
+
+//start scala shell with initialized
+// buffered reader for testing
+FlinkShell.bufferedReader = Some(in)
+FlinkShell.main(args)
+baos.flush
--- End diff --

If the return type of method or function is `Unit`, adding parenthesis 
after name is recommended.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r41519895
  
--- Diff: 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io._
+
+import org.junit.runner.RunWith
+import org.scalatest.{Matchers, FunSuite}
+import org.scalatest.junit.JUnitRunner
+
+
+@RunWith(classOf[JUnitRunner])
+class ScalaShellLocalStartupITCase extends FunSuite with Matchers {
+
+/**
+ * tests flink shell with local setup through startup script in bin 
folder
+ */
+test("start flink scala shell with local cluster") {
+
+  val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + 
"els.print\nError\n:q\n"
+  val in: BufferedReader = new BufferedReader(new StringReader(input + 
"\n"))
+  val out: StringWriter = new StringWriter
+  val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+  val oldOut: PrintStream = System.out
+  System.setOut(new PrintStream(baos))
+  val args: Array[String] = Array("local")
+
+  //start flink scala shell
+  FlinkShell.bufferedReader = Some(in);
+  FlinkShell.main(args)
+
+  baos.flush
--- End diff --

Please add parenthesis after `flush`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-146533038
  
Okay, if there is no objection and the travis passes, I'll merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r41518461
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -18,48 +18,74 @@
 
 package org.apache.flink.api.scala
 
+import java.io.{StringWriter, BufferedReader}
+
+import org.apache.flink.api.common.ExecutionMode
+
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 
 import scala.tools.nsc.Settings
 
+import scala.tools.nsc.interpreter._
+
 
 object FlinkShell {
 
+  object ExecutionMode extends Enumeration {
+val UNDEFINED, LOCAL, REMOTE = Value
+  }
+
+  var bufferedReader: Option[BufferedReader] = None
+
   def main(args: Array[String]) {
 
 // scopt, command line arguments
 case class Config(
 port: Int = -1,
 host: String = "none",
-externalJars: Option[Array[String]] = None)
+externalJars: Option[Array[String]] = None,
+flinkShellExecutionMode: ExecutionMode.Value = 
ExecutionMode.UNDEFINED)
+
 val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
   head ("Flink Scala Shell")
 
-  opt[Int] ('p', "port") action {
-(x, c) =>
-  c.copy (port = x)
-  } text("port specifies port of running JobManager")
-
-  opt[(String)] ('h',"host") action {
-case (x, c) =>
-  c.copy (host = x)
-  }  text("host specifies host name of running JobManager")
-
-  opt[(String)] ('a',"addclasspath") action {
-case (x,c) =>
-  val xArray = x.split(":")
-  c.copy(externalJars = Option(xArray))
-  } text("specifies additional jars to be used in Flink")
-  
-  help("help") text("prints this usage text")
+  cmd("local") action {
+(_, c) => c.copy(host = "none", port = -1, flinkShellExecutionMode 
= ExecutionMode.LOCAL)
+  } text("starts Flink scala shell with a local Flink cluster\n") 
children(
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x, c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink\n")
+)
+
+  cmd("remote") action { (_, c) =>
+c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
+  } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
+arg[String]("") action { (h, c) =>
+  c.copy(host = h) }
+  text("remote host name as string"),
+arg[Int]("") action { (p, c) =>
+  c.copy(port = p) }
+  text("remote port as integer\n"),
+opt[(String)]("addclasspath") abbr("a") valueName("") 
action {
+  case (x, c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink")
+
--- End diff --

Unnecessary new line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r41518435
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -18,48 +18,74 @@
 
 package org.apache.flink.api.scala
 
+import java.io.{StringWriter, BufferedReader}
+
+import org.apache.flink.api.common.ExecutionMode
+
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 
 import scala.tools.nsc.Settings
 
+import scala.tools.nsc.interpreter._
+
 
 object FlinkShell {
 
+  object ExecutionMode extends Enumeration {
+val UNDEFINED, LOCAL, REMOTE = Value
+  }
+
+  var bufferedReader: Option[BufferedReader] = None
+
   def main(args: Array[String]) {
 
 // scopt, command line arguments
 case class Config(
 port: Int = -1,
 host: String = "none",
-externalJars: Option[Array[String]] = None)
+externalJars: Option[Array[String]] = None,
+flinkShellExecutionMode: ExecutionMode.Value = 
ExecutionMode.UNDEFINED)
+
 val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
   head ("Flink Scala Shell")
 
-  opt[Int] ('p', "port") action {
-(x, c) =>
-  c.copy (port = x)
-  } text("port specifies port of running JobManager")
-
-  opt[(String)] ('h',"host") action {
-case (x, c) =>
-  c.copy (host = x)
-  }  text("host specifies host name of running JobManager")
-
-  opt[(String)] ('a',"addclasspath") action {
-case (x,c) =>
-  val xArray = x.split(":")
-  c.copy(externalJars = Option(xArray))
-  } text("specifies additional jars to be used in Flink")
-  
-  help("help") text("prints this usage text")
+  cmd("local") action {
+(_, c) => c.copy(host = "none", port = -1, flinkShellExecutionMode 
= ExecutionMode.LOCAL)
+  } text("starts Flink scala shell with a local Flink cluster\n") 
children(
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x, c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink\n")
+)
+
+  cmd("remote") action { (_, c) =>
+c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
+  } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
+arg[String]("") action { (h, c) =>
+  c.copy(host = h) }
+  text("remote host name as string"),
+arg[Int]("") action { (p, c) =>
+  c.copy(port = p) }
+  text("remote port as integer\n"),
+opt[(String)]("addclasspath") abbr("a") valueName("") 
action {
+  case (x, c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink")
+
+  )
+  help("help") abbr("h") text("prints this usage text\n")
 }
 
-
 // parse arguments
-parser.parse (args, Config () ) match {
+parser.parse (args, Config ()) match {
--- End diff --

Please remove space after `Config`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread nikste
Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-146573493
  
@chiwanpark issues should be fixed NOW ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread nikste
Github user nikste commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r41525031
  
--- Diff: 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
 ---
@@ -225,8 +223,50 @@ class ScalaShellITSuite extends FunSuite with Matchers 
with BeforeAndAfterAll {
 out.toString + stdout
   }
 
-  var cluster: Option[ForkableFlinkMiniCluster] = None
-  val parallelism = 4
+  /**
+   * tests flink shell startup with remote cluster (starts cluster 
internally)
+   */
+  test("start flink scala shell with remote cluster") {
+
+val input: String = "val els = env.fromElements(\"a\",\"b\");\n" +
+  "els.print\nError\n:q\n"
+
+val in: BufferedReader = new BufferedReader(
+  new StringReader(
+input + "\n"))
+val out: StringWriter = new StringWriter
+
+val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+val oldOut: PrintStream = System.out
+System.setOut(new PrintStream(baos))
+
+val (c, args) = cluster match{
+  case Some(cl) =>
+val arg = Array("remote",
+  cl.hostname,
+  Integer.toString(cl.getLeaderRPCPort))
+(cl, arg)
+  case None =>
+fail("Cluster creation failed!")
+}
+
+//start scala shell with initialized
+// buffered reader for testing
+FlinkShell.bufferedReader = Some(in)
+FlinkShell.main(args)
+baos.flush
--- End diff --

Actually the official reasoning is if it has some "side-effects" it should 
have some parenthesis (which it does in this case)
http://docs.scala-lang.org/style/method-invocation.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread nikste
Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-146568015
  
@chiwanpark issues should be fixed now..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r41523437
  
--- Diff: 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io._
+
+import org.junit.runner.RunWith
+import org.scalatest.{Matchers, FunSuite}
+import org.scalatest.junit.JUnitRunner
+
+
+@RunWith(classOf[JUnitRunner])
+class ScalaShellLocalStartupITCase extends FunSuite with Matchers {
+
+/**
+ * tests flink shell with local setup through startup script in bin 
folder
+ */
+test("start flink scala shell with local cluster") {
+
+  val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + 
"els.print\nError\n:q\n"
+  val in: BufferedReader = new BufferedReader(new StringReader(input + 
"\n"))
+  val out: StringWriter = new StringWriter
+  val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+  val oldOut: PrintStream = System.out
+  System.setOut(new PrintStream(baos))
+  val args: Array[String] = Array("local")
+
+  //start flink scala shell
+  FlinkShell.bufferedReader = Some(in);
+  FlinkShell.main(args)
+
+  baos.flush
--- End diff --

There is still unaddressed line. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-08 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r41525735
  
--- Diff: 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
 ---
@@ -225,8 +223,50 @@ class ScalaShellITSuite extends FunSuite with Matchers 
with BeforeAndAfterAll {
 out.toString + stdout
   }
 
-  var cluster: Option[ForkableFlinkMiniCluster] = None
-  val parallelism = 4
+  /**
+   * tests flink shell startup with remote cluster (starts cluster 
internally)
+   */
+  test("start flink scala shell with remote cluster") {
+
+val input: String = "val els = env.fromElements(\"a\",\"b\");\n" +
+  "els.print\nError\n:q\n"
+
+val in: BufferedReader = new BufferedReader(
+  new StringReader(
+input + "\n"))
+val out: StringWriter = new StringWriter
+
+val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+val oldOut: PrintStream = System.out
+System.setOut(new PrintStream(baos))
+
+val (c, args) = cluster match{
+  case Some(cl) =>
+val arg = Array("remote",
+  cl.hostname,
+  Integer.toString(cl.getLeaderRPCPort))
+(cl, arg)
+  case None =>
+fail("Cluster creation failed!")
+}
+
+//start scala shell with initialized
+// buffered reader for testing
+FlinkShell.bufferedReader = Some(in)
+FlinkShell.main(args)
+baos.flush
--- End diff --

Yeah, right. And almost all the method of which return type is `Unit` have 
some side-effects. :)

> On Oct 8, 2015, at 5:05 PM, Nikolaas Steenbergen 
 wrote:
> 
> In 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala:
> 
> > +
> > +val (c, args) = cluster match{
> > +  case Some(cl) =>
> > +val arg = Array("remote",
> > +  cl.hostname,
> > +  Integer.toString(cl.getLeaderRPCPort))
> > +(cl, arg)
> > +  case None =>
> > +fail("Cluster creation failed!")
> > +}
> > +
> > +//start scala shell with initialized
> > +// buffered reader for testing
> > +FlinkShell.bufferedReader = Some(in)
> > +FlinkShell.main(args)
> > +baos.flush
> 
> Actually the official reasoning is if it has some "side-effects" it 
should have some parenthesis (which it does in this case)
> http://docs.scala-lang.org/style/method-invocation.html
> 
> —
> Reply to this email directly or view it on GitHub.
> 

Regards,
Chiwan Park






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-07 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-146267428
  
Hi @nikste, because #1197 is merged first into master, we need rebase this 
on master. Could you rebase this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-01 Thread nikste
Github user nikste commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r40898989
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+
+import java.io.{StringWriter, BufferedReader}
+
+import scala.tools.nsc.Settings
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+
+import scala.tools.nsc.interpreter._
+
+
+object FlinkShell {
+
+  val LOCAL = 0;
+  val REMOTE = 1;
+  val UNDEFINED = -1;
+
+  var bufferedReader: BufferedReader = null;
+
+  def main(args: Array[String]) {
+
+// scopt, command line arguments
+case class Config(
+port: Int = -1,
+host: String = "none",
+externalJars: Option[Array[String]] = None,
+flinkShellExecutionMode : Int = UNDEFINED)
+
+val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
+  head ("Flink Scala Shell")
+
+  cmd("local") action {
+(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode 
= LOCAL)
+  } text("starts Flink scala shell with a local Flink cluster\n") 
children(
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink\n")
+)
+
+  cmd("remote") action { (_, c) =>
+c.copy(flinkShellExecutionMode = REMOTE)
+  } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
+arg[String]("") action { (h, c) =>
+  c.copy(host = h) }
+  text("remote host name as string"),
+arg[Int]("") action { (p, c) =>
+  c.copy(port = p) }
+  text("remote port as integer\n"),
+opt[(String)] ("addclasspath") abbr("a")  
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink")
+
+  )
+  help("help") abbr("h") text("prints this usage text\n")
+}
+
+
+// parse arguments
+parser.parse (args, Config () ) match {
+  case Some(config) =>
+startShell(config.host,
+  config.port,
+  config.flinkShellExecutionMode,
+  config.externalJars)
+
+  case _ => println("Could not parse program arguments")
+}
+  }
+
+
+  def startShell(
+  userHost : String, 
+  userPort : Int,
+  executionMode : Int,
+  externalJars : Option[Array[String]] = None): Unit ={
+
+println("Starting Flink Shell:")
+
+var cluster: LocalFlinkMiniCluster = null
+
+// either port or userhost not specified by user, create new 
minicluster
+val (host,port) = if (executionMode == LOCAL) {
+  cluster = new LocalFlinkMiniCluster(new Configuration, false)
--- End diff --

I was not aware of that, I would suggest, it should start up with slots for 
all cores available.
What do you think? And how would this be done easiest?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-01 Thread nikste
Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-144688685
  
@tillrohrmann, I changed the code according to your comments. There should 
be no null values anymore now ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r4088
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+
+import java.io.{StringWriter, BufferedReader}
+
+import scala.tools.nsc.Settings
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+
+import scala.tools.nsc.interpreter._
+
+
+object FlinkShell {
+
+  val LOCAL = 0;
+  val REMOTE = 1;
+  val UNDEFINED = -1;
+
+  var bufferedReader: BufferedReader = null;
+
+  def main(args: Array[String]) {
+
+// scopt, command line arguments
+case class Config(
+port: Int = -1,
+host: String = "none",
+externalJars: Option[Array[String]] = None,
+flinkShellExecutionMode : Int = UNDEFINED)
+
+val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
+  head ("Flink Scala Shell")
+
+  cmd("local") action {
+(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode 
= LOCAL)
+  } text("starts Flink scala shell with a local Flink cluster\n") 
children(
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink\n")
+)
+
+  cmd("remote") action { (_, c) =>
+c.copy(flinkShellExecutionMode = REMOTE)
+  } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
+arg[String]("") action { (h, c) =>
+  c.copy(host = h) }
+  text("remote host name as string"),
+arg[Int]("") action { (p, c) =>
+  c.copy(port = p) }
+  text("remote port as integer\n"),
+opt[(String)] ("addclasspath") abbr("a")  
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink")
+
+  )
+  help("help") abbr("h") text("prints this usage text\n")
+}
+
+
+// parse arguments
+parser.parse (args, Config () ) match {
+  case Some(config) =>
+startShell(config.host,
+  config.port,
+  config.flinkShellExecutionMode,
+  config.externalJars)
+
+  case _ => println("Could not parse program arguments")
+}
+  }
+
+
+  def startShell(
+  userHost : String, 
+  userPort : Int,
+  executionMode : Int,
+  externalJars : Option[Array[String]] = None): Unit ={
+
+println("Starting Flink Shell:")
+
+var cluster: LocalFlinkMiniCluster = null
+
+// either port or userhost not specified by user, create new 
minicluster
+val (host,port) = if (executionMode == LOCAL) {
+  cluster = new LocalFlinkMiniCluster(new Configuration, false)
+  cluster.start()
+  val port = cluster.getLeaderRPCPort
+  println(s"\nStarting local Flink cluster (host: localhost, port: 
$port).\n")
+  ("localhost",port)
+} else if(executionMode == UNDEFINED) {
+  println("Error: please specify execution mode:")
+  println("[local | remote  ]")
+  return
+} else if(userHost == "none" || userPort == -1){
+  println("Error:  or  not specified!")
+  return
+} else {
+  println(s"\nConnecting to Flink cluster (host: $userHost, port: 
$userPort).\n")
+  (userHost, userPort)
+}
--- End diff --

it would be more 

[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r40890167
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+
+import java.io.{StringWriter, BufferedReader}
+
+import scala.tools.nsc.Settings
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+
+import scala.tools.nsc.interpreter._
+
+
+object FlinkShell {
+
+  val LOCAL = 0;
+  val REMOTE = 1;
+  val UNDEFINED = -1;
+
+  var bufferedReader: BufferedReader = null;
+
+  def main(args: Array[String]) {
+
+// scopt, command line arguments
+case class Config(
+port: Int = -1,
+host: String = "none",
+externalJars: Option[Array[String]] = None,
+flinkShellExecutionMode : Int = UNDEFINED)
+
+val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
+  head ("Flink Scala Shell")
+
+  cmd("local") action {
+(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode 
= LOCAL)
+  } text("starts Flink scala shell with a local Flink cluster\n") 
children(
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink\n")
+)
+
+  cmd("remote") action { (_, c) =>
+c.copy(flinkShellExecutionMode = REMOTE)
+  } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
+arg[String]("") action { (h, c) =>
+  c.copy(host = h) }
+  text("remote host name as string"),
+arg[Int]("") action { (p, c) =>
+  c.copy(port = p) }
+  text("remote port as integer\n"),
+opt[(String)] ("addclasspath") abbr("a")  
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink")
+
+  )
+  help("help") abbr("h") text("prints this usage text\n")
+}
+
+
+// parse arguments
+parser.parse (args, Config () ) match {
+  case Some(config) =>
+startShell(config.host,
+  config.port,
+  config.flinkShellExecutionMode,
+  config.externalJars)
+
+  case _ => println("Could not parse program arguments")
+}
+  }
+
+
+  def startShell(
+  userHost : String, 
+  userPort : Int,
+  executionMode : Int,
+  externalJars : Option[Array[String]] = None): Unit ={
+
+println("Starting Flink Shell:")
+
+var cluster: LocalFlinkMiniCluster = null
+
+// either port or userhost not specified by user, create new 
minicluster
+val (host,port) = if (executionMode == LOCAL) {
+  cluster = new LocalFlinkMiniCluster(new Configuration, false)
+  cluster.start()
+  val port = cluster.getLeaderRPCPort
+  println(s"\nStarting local Flink cluster (host: localhost, port: 
$port).\n")
+  ("localhost",port)
+} else if(executionMode == UNDEFINED) {
+  println("Error: please specify execution mode:")
+  println("[local | remote  ]")
+  return
+} else if(userHost == "none" || userPort == -1){
+  println("Error:  or  not specified!")
+  return
+} else {
+  println(s"\nConnecting to Flink cluster (host: $userHost, port: 
$userPort).\n")
+  (userHost, userPort)
+}
+
+
+// custom shell
+var 

[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r40890119
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+
+import java.io.{StringWriter, BufferedReader}
+
+import scala.tools.nsc.Settings
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+
+import scala.tools.nsc.interpreter._
+
+
+object FlinkShell {
+
+  val LOCAL = 0;
+  val REMOTE = 1;
+  val UNDEFINED = -1;
+
+  var bufferedReader: BufferedReader = null;
+
+  def main(args: Array[String]) {
+
+// scopt, command line arguments
+case class Config(
+port: Int = -1,
+host: String = "none",
+externalJars: Option[Array[String]] = None,
+flinkShellExecutionMode : Int = UNDEFINED)
+
+val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
+  head ("Flink Scala Shell")
+
+  cmd("local") action {
+(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode 
= LOCAL)
+  } text("starts Flink scala shell with a local Flink cluster\n") 
children(
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink\n")
+)
+
+  cmd("remote") action { (_, c) =>
+c.copy(flinkShellExecutionMode = REMOTE)
+  } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
+arg[String]("") action { (h, c) =>
+  c.copy(host = h) }
+  text("remote host name as string"),
+arg[Int]("") action { (p, c) =>
+  c.copy(port = p) }
+  text("remote port as integer\n"),
+opt[(String)] ("addclasspath") abbr("a")  
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink")
+
+  )
+  help("help") abbr("h") text("prints this usage text\n")
+}
+
+
+// parse arguments
+parser.parse (args, Config () ) match {
+  case Some(config) =>
+startShell(config.host,
+  config.port,
+  config.flinkShellExecutionMode,
+  config.externalJars)
+
+  case _ => println("Could not parse program arguments")
+}
+  }
+
+
+  def startShell(
+  userHost : String, 
+  userPort : Int,
+  executionMode : Int,
+  externalJars : Option[Array[String]] = None): Unit ={
+
+println("Starting Flink Shell:")
+
+var cluster: LocalFlinkMiniCluster = null
+
+// either port or userhost not specified by user, create new 
minicluster
+val (host,port) = if (executionMode == LOCAL) {
+  cluster = new LocalFlinkMiniCluster(new Configuration, false)
+  cluster.start()
+  val port = cluster.getLeaderRPCPort
+  println(s"\nStarting local Flink cluster (host: localhost, port: 
$port).\n")
+  ("localhost",port)
+} else if(executionMode == UNDEFINED) {
+  println("Error: please specify execution mode:")
+  println("[local | remote  ]")
+  return
+} else if(userHost == "none" || userPort == -1){
+  println("Error:  or  not specified!")
+  return
+} else {
+  println(s"\nConnecting to Flink cluster (host: $userHost, port: 
$userPort).\n")
+  (userHost, userPort)
+}
+
+
+// custom shell
+var 

[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r40890492
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+
+import java.io.{StringWriter, BufferedReader}
+
+import scala.tools.nsc.Settings
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+
+import scala.tools.nsc.interpreter._
+
+
+object FlinkShell {
+
+  val LOCAL = 0;
+  val REMOTE = 1;
+  val UNDEFINED = -1;
+
+  var bufferedReader: BufferedReader = null;
+
+  def main(args: Array[String]) {
+
+// scopt, command line arguments
+case class Config(
+port: Int = -1,
+host: String = "none",
+externalJars: Option[Array[String]] = None,
+flinkShellExecutionMode : Int = UNDEFINED)
+
+val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
+  head ("Flink Scala Shell")
+
+  cmd("local") action {
+(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode 
= LOCAL)
+  } text("starts Flink scala shell with a local Flink cluster\n") 
children(
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink\n")
+)
+
+  cmd("remote") action { (_, c) =>
+c.copy(flinkShellExecutionMode = REMOTE)
+  } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
+arg[String]("") action { (h, c) =>
+  c.copy(host = h) }
+  text("remote host name as string"),
+arg[Int]("") action { (p, c) =>
+  c.copy(port = p) }
+  text("remote port as integer\n"),
+opt[(String)] ("addclasspath") abbr("a")  
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink")
+
+  )
+  help("help") abbr("h") text("prints this usage text\n")
+}
+
+
+// parse arguments
+parser.parse (args, Config () ) match {
+  case Some(config) =>
+startShell(config.host,
+  config.port,
+  config.flinkShellExecutionMode,
+  config.externalJars)
+
+  case _ => println("Could not parse program arguments")
+}
+  }
+
+
+  def startShell(
+  userHost : String, 
+  userPort : Int,
+  executionMode : Int,
+  externalJars : Option[Array[String]] = None): Unit ={
+
+println("Starting Flink Shell:")
+
+var cluster: LocalFlinkMiniCluster = null
+
+// either port or userhost not specified by user, create new 
minicluster
+val (host,port) = if (executionMode == LOCAL) {
+  cluster = new LocalFlinkMiniCluster(new Configuration, false)
+  cluster.start()
+  val port = cluster.getLeaderRPCPort
+  println(s"\nStarting local Flink cluster (host: localhost, port: 
$port).\n")
+  ("localhost",port)
+} else if(executionMode == UNDEFINED) {
+  println("Error: please specify execution mode:")
+  println("[local | remote  ]")
+  return
+} else if(userHost == "none" || userPort == -1){
+  println("Error:  or  not specified!")
+  return
+} else {
+  println(s"\nConnecting to Flink cluster (host: $userHost, port: 
$userPort).\n")
+  (userHost, userPort)
+}
+
+
+// custom shell
+var 

[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r40892847
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+
+import java.io.{StringWriter, BufferedReader}
+
+import scala.tools.nsc.Settings
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+
+import scala.tools.nsc.interpreter._
+
+
+object FlinkShell {
+
+  val LOCAL = 0;
+  val REMOTE = 1;
+  val UNDEFINED = -1;
+
+  var bufferedReader: BufferedReader = null;
+
+  def main(args: Array[String]) {
+
+// scopt, command line arguments
+case class Config(
+port: Int = -1,
+host: String = "none",
+externalJars: Option[Array[String]] = None,
+flinkShellExecutionMode : Int = UNDEFINED)
+
+val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
+  head ("Flink Scala Shell")
+
+  cmd("local") action {
+(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode 
= LOCAL)
+  } text("starts Flink scala shell with a local Flink cluster\n") 
children(
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink\n")
+)
+
+  cmd("remote") action { (_, c) =>
+c.copy(flinkShellExecutionMode = REMOTE)
+  } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
+arg[String]("") action { (h, c) =>
+  c.copy(host = h) }
+  text("remote host name as string"),
+arg[Int]("") action { (p, c) =>
+  c.copy(port = p) }
+  text("remote port as integer\n"),
+opt[(String)] ("addclasspath") abbr("a")  
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink")
+
+  )
+  help("help") abbr("h") text("prints this usage text\n")
+}
+
+
+// parse arguments
+parser.parse (args, Config () ) match {
+  case Some(config) =>
+startShell(config.host,
+  config.port,
+  config.flinkShellExecutionMode,
+  config.externalJars)
+
+  case _ => println("Could not parse program arguments")
+}
+  }
+
+
+  def startShell(
+  userHost : String, 
+  userPort : Int,
+  executionMode : Int,
+  externalJars : Option[Array[String]] = None): Unit ={
+
+println("Starting Flink Shell:")
+
+var cluster: LocalFlinkMiniCluster = null
+
+// either port or userhost not specified by user, create new 
minicluster
+val (host,port) = if (executionMode == LOCAL) {
+  cluster = new LocalFlinkMiniCluster(new Configuration, false)
+  cluster.start()
+  val port = cluster.getLeaderRPCPort
+  println(s"\nStarting local Flink cluster (host: localhost, port: 
$port).\n")
+  ("localhost",port)
+} else if(executionMode == UNDEFINED) {
+  println("Error: please specify execution mode:")
+  println("[local | remote  ]")
+  return
--- End diff --

Return statements are breaking the control flow. This is not a good style. 
Would be great to get rid of them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the 

[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r40892778
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+
+import java.io.{StringWriter, BufferedReader}
+
+import scala.tools.nsc.Settings
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+
+import scala.tools.nsc.interpreter._
+
+
+object FlinkShell {
+
+  val LOCAL = 0;
+  val REMOTE = 1;
+  val UNDEFINED = -1;
+
+  var bufferedReader: BufferedReader = null;
+
+  def main(args: Array[String]) {
+
+// scopt, command line arguments
+case class Config(
+port: Int = -1,
+host: String = "none",
+externalJars: Option[Array[String]] = None,
+flinkShellExecutionMode : Int = UNDEFINED)
+
+val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
+  head ("Flink Scala Shell")
+
+  cmd("local") action {
+(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode 
= LOCAL)
+  } text("starts Flink scala shell with a local Flink cluster\n") 
children(
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink\n")
+)
+
+  cmd("remote") action { (_, c) =>
+c.copy(flinkShellExecutionMode = REMOTE)
+  } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
+arg[String]("") action { (h, c) =>
+  c.copy(host = h) }
+  text("remote host name as string"),
+arg[Int]("") action { (p, c) =>
+  c.copy(port = p) }
+  text("remote port as integer\n"),
+opt[(String)] ("addclasspath") abbr("a")  
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink")
+
+  )
+  help("help") abbr("h") text("prints this usage text\n")
+}
+
+
+// parse arguments
+parser.parse (args, Config () ) match {
+  case Some(config) =>
+startShell(config.host,
+  config.port,
+  config.flinkShellExecutionMode,
+  config.externalJars)
+
+  case _ => println("Could not parse program arguments")
+}
+  }
+
+
+  def startShell(
+  userHost : String, 
+  userPort : Int,
+  executionMode : Int,
+  externalJars : Option[Array[String]] = None): Unit ={
+
+println("Starting Flink Shell:")
+
+var cluster: LocalFlinkMiniCluster = null
+
+// either port or userhost not specified by user, create new 
minicluster
+val (host,port) = if (executionMode == LOCAL) {
+  cluster = new LocalFlinkMiniCluster(new Configuration, false)
+  cluster.start()
+  val port = cluster.getLeaderRPCPort
+  println(s"\nStarting local Flink cluster (host: localhost, port: 
$port).\n")
+  ("localhost",port)
+} else if(executionMode == UNDEFINED) {
+  println("Error: please specify execution mode:")
+  println("[local | remote  ]")
+  return
+} else if(userHost == "none" || userPort == -1){
+  println("Error:  or  not specified!")
+  return
+} else {
+  println(s"\nConnecting to Flink cluster (host: $userHost, port: 
$userPort).\n")
+  (userHost, userPort)
+}
+
+
+// custom shell
+var 

[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r40892740
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+
+import java.io.{StringWriter, BufferedReader}
+
+import scala.tools.nsc.Settings
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+
+import scala.tools.nsc.interpreter._
+
+
+object FlinkShell {
+
+  val LOCAL = 0;
+  val REMOTE = 1;
+  val UNDEFINED = -1;
+
+  var bufferedReader: BufferedReader = null;
+
+  def main(args: Array[String]) {
+
+// scopt, command line arguments
+case class Config(
+port: Int = -1,
+host: String = "none",
+externalJars: Option[Array[String]] = None,
+flinkShellExecutionMode : Int = UNDEFINED)
+
+val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
+  head ("Flink Scala Shell")
+
+  cmd("local") action {
+(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode 
= LOCAL)
+  } text("starts Flink scala shell with a local Flink cluster\n") 
children(
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink\n")
+)
+
+  cmd("remote") action { (_, c) =>
+c.copy(flinkShellExecutionMode = REMOTE)
+  } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
+arg[String]("") action { (h, c) =>
+  c.copy(host = h) }
+  text("remote host name as string"),
+arg[Int]("") action { (p, c) =>
+  c.copy(port = p) }
+  text("remote port as integer\n"),
+opt[(String)] ("addclasspath") abbr("a")  
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink")
+
+  )
+  help("help") abbr("h") text("prints this usage text\n")
+}
+
+
+// parse arguments
+parser.parse (args, Config () ) match {
+  case Some(config) =>
+startShell(config.host,
+  config.port,
+  config.flinkShellExecutionMode,
+  config.externalJars)
+
+  case _ => println("Could not parse program arguments")
+}
+  }
+
+
+  def startShell(
+  userHost : String, 
+  userPort : Int,
+  executionMode : Int,
+  externalJars : Option[Array[String]] = None): Unit ={
+
+println("Starting Flink Shell:")
+
+var cluster: LocalFlinkMiniCluster = null
--- End diff --

Can we make `cluster` a `val` of type `Option[LocalFlinkMiniCluster]` whose 
value is the value of the `if-else` expression? That way we avoid `null` values 
which are evil.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r40892899
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+
+import java.io.{StringWriter, BufferedReader}
+
+import scala.tools.nsc.Settings
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+
+import scala.tools.nsc.interpreter._
+
+
+object FlinkShell {
+
+  val LOCAL = 0;
+  val REMOTE = 1;
+  val UNDEFINED = -1;
+
+  var bufferedReader: BufferedReader = null;
+
+  def main(args: Array[String]) {
+
+// scopt, command line arguments
+case class Config(
+port: Int = -1,
+host: String = "none",
+externalJars: Option[Array[String]] = None,
+flinkShellExecutionMode : Int = UNDEFINED)
+
+val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
+  head ("Flink Scala Shell")
+
+  cmd("local") action {
+(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode 
= LOCAL)
+  } text("starts Flink scala shell with a local Flink cluster\n") 
children(
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink\n")
+)
+
+  cmd("remote") action { (_, c) =>
+c.copy(flinkShellExecutionMode = REMOTE)
+  } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
+arg[String]("") action { (h, c) =>
+  c.copy(host = h) }
+  text("remote host name as string"),
+arg[Int]("") action { (p, c) =>
+  c.copy(port = p) }
+  text("remote port as integer\n"),
+opt[(String)] ("addclasspath") abbr("a")  
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink")
+
+  )
+  help("help") abbr("h") text("prints this usage text\n")
+}
+
+
+// parse arguments
+parser.parse (args, Config () ) match {
+  case Some(config) =>
+startShell(config.host,
+  config.port,
+  config.flinkShellExecutionMode,
+  config.externalJars)
+
+  case _ => println("Could not parse program arguments")
+}
+  }
+
+
+  def startShell(
+  userHost : String, 
+  userPort : Int,
+  executionMode : Int,
+  externalJars : Option[Array[String]] = None): Unit ={
+
+println("Starting Flink Shell:")
+
+var cluster: LocalFlinkMiniCluster = null
+
+// either port or userhost not specified by user, create new 
minicluster
+val (host,port) = if (executionMode == LOCAL) {
+  cluster = new LocalFlinkMiniCluster(new Configuration, false)
+  cluster.start()
+  val port = cluster.getLeaderRPCPort
+  println(s"\nStarting local Flink cluster (host: localhost, port: 
$port).\n")
+  ("localhost",port)
+} else if(executionMode == UNDEFINED) {
+  println("Error: please specify execution mode:")
+  println("[local | remote  ]")
+  return
+} else if(userHost == "none" || userPort == -1){
+  println("Error:  or  not specified!")
+  return
+} else {
+  println(s"\nConnecting to Flink cluster (host: $userHost, port: 
$userPort).\n")
+  (userHost, userPort)
+}
+
+
+// custom shell
+var 

[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r40893325
  
--- Diff: 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
 ---
@@ -225,6 +220,51 @@ class ScalaShellITSuite extends FunSuite with Matchers 
with BeforeAndAfterAll {
 out.toString + stdout
   }
 
+  /**
+   * tests flink shell startup with remote cluster (starts cluster 
internally)
+   */
+  test("start flink scala shell with remote cluster") {
+
+val input: String = "val els = env.fromElements(\"a\",\"b\");\n" +
+  "els.print\nError\n:q\n"
+
+val in: BufferedReader = new BufferedReader(
+  new StringReader(
+input + "\n"))
+val out: StringWriter = new StringWriter
+
+val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+val oldOut: PrintStream = System.out
+System.setOut(new PrintStream(baos))
+val c = cluster.getOrElse(null)
--- End diff --

`null` values are not very scalaesque. Especially since you already have a 
`Option[ForkableFlinkMiniCluster]`. Why not simply applying a `map` operation 
on it or extracting it via pattern matching?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-01 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-144667856
  
I had some more comments mostly concerning Scala style and one concerning 
the setup of the `LocalFlinkMiniCluster` which will currently only be started 
with a single TM and a single slot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r40889842
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+
+import java.io.{StringWriter, BufferedReader}
+
+import scala.tools.nsc.Settings
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+
+import scala.tools.nsc.interpreter._
+
+
+object FlinkShell {
+
+  val LOCAL = 0;
+  val REMOTE = 1;
+  val UNDEFINED = -1;
--- End diff --

Can we use an enum for that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r40890027
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+
+import java.io.{StringWriter, BufferedReader}
+
+import scala.tools.nsc.Settings
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+
+import scala.tools.nsc.interpreter._
+
+
+object FlinkShell {
+
+  val LOCAL = 0;
+  val REMOTE = 1;
+  val UNDEFINED = -1;
+
+  var bufferedReader: BufferedReader = null;
+
+  def main(args: Array[String]) {
+
+// scopt, command line arguments
+case class Config(
+port: Int = -1,
+host: String = "none",
+externalJars: Option[Array[String]] = None,
+flinkShellExecutionMode : Int = UNDEFINED)
+
+val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
+  head ("Flink Scala Shell")
+
+  cmd("local") action {
+(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode 
= LOCAL)
+  } text("starts Flink scala shell with a local Flink cluster\n") 
children(
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink\n")
+)
+
+  cmd("remote") action { (_, c) =>
+c.copy(flinkShellExecutionMode = REMOTE)
+  } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
+arg[String]("") action { (h, c) =>
+  c.copy(host = h) }
+  text("remote host name as string"),
+arg[Int]("") action { (p, c) =>
+  c.copy(port = p) }
+  text("remote port as integer\n"),
+opt[(String)] ("addclasspath") abbr("a")  
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink")
+
+  )
+  help("help") abbr("h") text("prints this usage text\n")
+}
+
+
+// parse arguments
+parser.parse (args, Config () ) match {
+  case Some(config) =>
+startShell(config.host,
+  config.port,
+  config.flinkShellExecutionMode,
+  config.externalJars)
+
+  case _ => println("Could not parse program arguments")
+}
+  }
+
+
+  def startShell(
+  userHost : String, 
+  userPort : Int,
+  executionMode : Int,
+  externalJars : Option[Array[String]] = None): Unit ={
+
+println("Starting Flink Shell:")
+
+var cluster: LocalFlinkMiniCluster = null
+
+// either port or userhost not specified by user, create new 
minicluster
+val (host,port) = if (executionMode == LOCAL) {
+  cluster = new LocalFlinkMiniCluster(new Configuration, false)
+  cluster.start()
+  val port = cluster.getLeaderRPCPort
+  println(s"\nStarting local Flink cluster (host: localhost, port: 
$port).\n")
+  ("localhost",port)
+} else if(executionMode == UNDEFINED) {
+  println("Error: please specify execution mode:")
+  println("[local | remote  ]")
+  return
+} else if(userHost == "none" || userPort == -1){
+  println("Error:  or  not specified!")
+  return
+} else {
+  println(s"\nConnecting to Flink cluster (host: $userHost, port: 
$userPort).\n")
+  (userHost, userPort)
+}
+
+
--- End diff --

two line 

[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r40890515
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+
+import java.io.{StringWriter, BufferedReader}
+
+import scala.tools.nsc.Settings
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+
+import scala.tools.nsc.interpreter._
+
+
+object FlinkShell {
+
+  val LOCAL = 0;
+  val REMOTE = 1;
+  val UNDEFINED = -1;
+
+  var bufferedReader: BufferedReader = null;
+
+  def main(args: Array[String]) {
+
+// scopt, command line arguments
+case class Config(
+port: Int = -1,
+host: String = "none",
+externalJars: Option[Array[String]] = None,
+flinkShellExecutionMode : Int = UNDEFINED)
+
+val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
+  head ("Flink Scala Shell")
+
+  cmd("local") action {
+(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode 
= LOCAL)
+  } text("starts Flink scala shell with a local Flink cluster\n") 
children(
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink\n")
+)
+
+  cmd("remote") action { (_, c) =>
+c.copy(flinkShellExecutionMode = REMOTE)
+  } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
+arg[String]("") action { (h, c) =>
+  c.copy(host = h) }
+  text("remote host name as string"),
+arg[Int]("") action { (p, c) =>
+  c.copy(port = p) }
+  text("remote port as integer\n"),
+opt[(String)] ("addclasspath") abbr("a")  
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink")
+
+  )
+  help("help") abbr("h") text("prints this usage text\n")
+}
+
+
+// parse arguments
+parser.parse (args, Config () ) match {
+  case Some(config) =>
+startShell(config.host,
+  config.port,
+  config.flinkShellExecutionMode,
+  config.externalJars)
+
+  case _ => println("Could not parse program arguments")
+}
+  }
+
+
+  def startShell(
+  userHost : String, 
+  userPort : Int,
+  executionMode : Int,
+  externalJars : Option[Array[String]] = None): Unit ={
+
+println("Starting Flink Shell:")
+
+var cluster: LocalFlinkMiniCluster = null
+
+// either port or userhost not specified by user, create new 
minicluster
+val (host,port) = if (executionMode == LOCAL) {
+  cluster = new LocalFlinkMiniCluster(new Configuration, false)
+  cluster.start()
+  val port = cluster.getLeaderRPCPort
+  println(s"\nStarting local Flink cluster (host: localhost, port: 
$port).\n")
+  ("localhost",port)
+} else if(executionMode == UNDEFINED) {
+  println("Error: please specify execution mode:")
+  println("[local | remote  ]")
+  return
+} else if(userHost == "none" || userPort == -1){
+  println("Error:  or  not specified!")
+  return
+} else {
+  println(s"\nConnecting to Flink cluster (host: $userHost, port: 
$userPort).\n")
+  (userHost, userPort)
+}
+
+
+// custom shell
+var 

[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r40893139
  
--- Diff: 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
 ---
@@ -225,6 +220,51 @@ class ScalaShellITSuite extends FunSuite with Matchers 
with BeforeAndAfterAll {
 out.toString + stdout
   }
 
+  /**
+   * tests flink shell startup with remote cluster (starts cluster 
internally)
+   */
+  test("start flink scala shell with remote cluster") {
+
+val input: String = "val els = env.fromElements(\"a\",\"b\");\n" +
+  "els.print\nError\n:q\n"
+
+val in: BufferedReader = new BufferedReader(
+  new StringReader(
+input + "\n"))
+val out: StringWriter = new StringWriter
+
+val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+val oldOut: PrintStream = System.out
+System.setOut(new PrintStream(baos))
+val c = cluster.getOrElse(null)
+var args : Array[String] = null
+if (c != null) {
+  args = Array("remote",
+c.hostname,
+Integer.toString(c.getLeaderRPCPort))
+} else {
+  fail("Cluster creation failed!")
+}
+
+//start scala shell with initialized
+// buffered reader for testing
+FlinkShell.bufferedReader = in
+FlinkShell.main(args)
+baos.flush
+
+val output: String = baos.toString
+System.setOut(oldOut)
+
+output should include("Job execution switched to status FINISHED.")
+output should include("a\nb")
+
+output should not include "Error"
+output should not include "ERROR"
+output should not include "Exception"
+output should not include "failed"
+  }
+
+
   var cluster: Option[ForkableFlinkMiniCluster] = None
   val parallelism = 4
--- End diff --

Can we move the class field to the top of the class definition? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r40893417
  
--- Diff: 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
 ---
@@ -225,6 +220,51 @@ class ScalaShellITSuite extends FunSuite with Matchers 
with BeforeAndAfterAll {
 out.toString + stdout
   }
 
+  /**
+   * tests flink shell startup with remote cluster (starts cluster 
internally)
+   */
+  test("start flink scala shell with remote cluster") {
+
+val input: String = "val els = env.fromElements(\"a\",\"b\");\n" +
+  "els.print\nError\n:q\n"
+
+val in: BufferedReader = new BufferedReader(
+  new StringReader(
+input + "\n"))
+val out: StringWriter = new StringWriter
+
+val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+val oldOut: PrintStream = System.out
+System.setOut(new PrintStream(baos))
+val c = cluster.getOrElse(null)
+var args : Array[String] = null
+if (c != null) {
--- End diff --

Here again. `null` values and `var` fields can be easily avoided in Scala 
by simply executing the statements after the else branch within the if branch. 
This will also lead to a much cleaner control flow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-10-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r40893947
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+
+import java.io.{StringWriter, BufferedReader}
+
+import scala.tools.nsc.Settings
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+
+import scala.tools.nsc.interpreter._
+
+
+object FlinkShell {
+
+  val LOCAL = 0;
+  val REMOTE = 1;
+  val UNDEFINED = -1;
+
+  var bufferedReader: BufferedReader = null;
+
+  def main(args: Array[String]) {
+
+// scopt, command line arguments
+case class Config(
+port: Int = -1,
+host: String = "none",
+externalJars: Option[Array[String]] = None,
+flinkShellExecutionMode : Int = UNDEFINED)
+
+val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
+  head ("Flink Scala Shell")
+
+  cmd("local") action {
+(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode 
= LOCAL)
+  } text("starts Flink scala shell with a local Flink cluster\n") 
children(
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink\n")
+)
+
+  cmd("remote") action { (_, c) =>
+c.copy(flinkShellExecutionMode = REMOTE)
+  } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
+arg[String]("") action { (h, c) =>
+  c.copy(host = h) }
+  text("remote host name as string"),
+arg[Int]("") action { (p, c) =>
+  c.copy(port = p) }
+  text("remote port as integer\n"),
+opt[(String)] ("addclasspath") abbr("a")  
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink")
+
+  )
+  help("help") abbr("h") text("prints this usage text\n")
+}
+
+
+// parse arguments
+parser.parse (args, Config () ) match {
+  case Some(config) =>
+startShell(config.host,
+  config.port,
+  config.flinkShellExecutionMode,
+  config.externalJars)
+
+  case _ => println("Could not parse program arguments")
+}
+  }
+
+
+  def startShell(
+  userHost : String, 
+  userPort : Int,
+  executionMode : Int,
+  externalJars : Option[Array[String]] = None): Unit ={
+
+println("Starting Flink Shell:")
+
+var cluster: LocalFlinkMiniCluster = null
+
+// either port or userhost not specified by user, create new 
minicluster
+val (host,port) = if (executionMode == LOCAL) {
+  cluster = new LocalFlinkMiniCluster(new Configuration, false)
--- End diff --

This will start a singel TM with a single slot. Do we want that? Or do we 
want to start a single TM with multiple slots? E.g. number of cores.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-30 Thread nikste
Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-144345845
  
 @chiwanpark can you check again ? It seems that if I squash both commits 
the history isn't shown anymore. 

If they are separated
~~~bash
git log --follow 
~~~

shows the history of both files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-30 Thread nikste
GitHub user nikste reopened a pull request:

https://github.com/apache/flink/pull/1106

[FLINK-2613] Print usage information for Scala Shell

changed startup of scala shell

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nikste/flink FLINK-2613-enhanced-help-message

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1106.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1106


commit 7e20299c4e2d9cc78c36f90bdf0acdbaf72062b0
Author: Stephan Ewen 
Date:   2015-09-23T10:05:54Z

[FLINK-2753] [streaming] [api breaking] Add first parts of new window API 
for key grouped windows

This follows the API design outlined in 
https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams

This is API breaking because it adds new generic type parameters to Java 
and Scala classes, breaking binary compatibility.

commit a606c4a9e83e54397c88a70ea80a00e32ca61a93
Author: Aljoscha Krettek 
Date:   2015-09-24T14:30:29Z

Fix some typos and naming inconsistencies in new Windowing Code

commit 05d2138f081ff5fa274dab571b9327f96be693aa
Author: Aljoscha Krettek 
Date:   2015-09-24T14:33:09Z

Move window operators and tests to windowing package

The api package is also called windowing, this harmonizes the package
names.

commit 86c45bfa2c760ca99741fa866f30514c7986
Author: Aljoscha Krettek 
Date:   2015-09-24T14:40:46Z

Harmonize generic parameter names in Stream API classes

commit 6610caeca4093fced35d478ffc596f935912b4c1
Author: Aljoscha Krettek 
Date:   2015-09-24T15:39:19Z

Add Window parameter to KeyedWindowFunction, move to package windowing

commit bfaad37c0aa8ac43a282a8c9cb2e0d64fdf22f72
Author: Aljoscha Krettek 
Date:   2015-09-24T15:42:15Z

Add Count and Delta  WindowPolicy

commit 3be2dc1aaaeae4cbbfaecaf4998a64f1199260eb
Author: Aljoscha Krettek 
Date:   2015-09-25T09:34:10Z

Move delta window functions to package functions.windowing.delta

commit dd51c97741b336d3a11e319183537eef864a84fd
Author: Aljoscha Krettek 
Date:   2015-09-25T10:27:35Z

[FLINK-2677] Add a general-purpose keyed-window operator

commit 0f17755766f2c4ada37bb196981c7dc01016f73c
Author: Maximilian Michels 
Date:   2015-09-28T18:07:18Z

[hotfix][streaming] correct return type of execute(..) method

commit 0b4dc067fee82654fae3292bf6bf7d59157bf5c0
Author: vasia 
Date:   2015-09-24T20:08:10Z

[FLINK-2561] [gelly] add missing methods to Graph:
add-remove edges/vertices, difference, graph creation methods,
validate, getTriplets. Add missing utility mappers.

commit 9e0284efa90561c88b1bc829b800d89e84477caa
Author: vasia 
Date:   2015-09-24T21:31:38Z

[FLINK-2561] [gelly] convert existing tests to use collect instead of files;
add tests for newly added operations.

Add completeness test: fromCsvReader method is missing.

commit 233dab497577f0a9443f772bb10390f6dcc005f1
Author: vasia 
Date:   2015-09-25T09:20:15Z

[FLINK-2561] [gelly] add GraphMetrics Scala example

This closes #1183

commit 8d2289ea6981598ab4d2817192b3f171aa9d414d
Author: Rerngvit Yanggratoke 
Date:   2015-09-28T18:34:46Z

[FLINK-2768] [docs] Fix wrong java version requirement in quickstart (1.6 
-> 1.7 and 6.x -> 7.x)

This closes #1188

commit 3b8b4f0f8c0600dc851d676ce1bd7f5ab81cb64f
Author: Ufuk Celebi 
Date:   2015-09-26T12:12:20Z

[FLINK-2769] [dashboard] Remove hard coded job server URL

This closes #1185

commit 40cbf7e4b038c945916fa8843b60bd4a59a4ae31
Author: Fabian Hueske 
Date:   2015-09-25T09:47:27Z

[FLINK-2756] [scripts] Fix start/stop scripts for paths with spaces

This closes #1182

commit e727355e42bd0ad7d403aee703aaf33a68a839d2
Author: Greg Hogan 
Date:   2015-09-21T19:14:09Z

[FLINK-2723] [core] CopyableValue method to copy into new instance

This closes #1169

commit 68912126d73b92a07d15ec3f21f9ac922744fb45
Author: chengxiang li 
Date:   2015-09-24T03:20:10Z

[FLINK-2754] Fixed FixedLengthRecordSorter write to multi memory pages 
issue and add more unit tests.

This closes #1178

commit 0a8df6d513fa59d650ff875bdf3a1613d0f14af5
Author: Greg Hogan 
Date:   2015-09-10T13:35:39Z

[FLINK-2653] [runtime] Enable object reuse in MergeIterator

This closes #1115

commit 16afb8ec66a2a07733b9090bffe96af1e913bb63
Author: Sachin Goel 
Date:  

[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-30 Thread nikste
Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-144337406
  
@chiwanpark I moved the files with git mv, however they still don't show 
the history.
This seems to be more of a gui thing of github according to:


http://stackoverflow.com/questions/31304441/i-moved-all-my-project-files-with-git-mv-but-i-have-lost-all-my-history-why

However, the history still exists and can be accessed via:
~~~bash
git log --follow 
~~~



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-30 Thread nikste
Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-144339801
  
@sachingoel0101 just did and I've merged the two commits


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-30 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-144347584
  
@chiwanpark this should get fixed after your #1197 gets in first and this 
is rebased on that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-30 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-144338050
  
Hey @nikste , you should rebase this to the current master. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-30 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-144341545
  
But It seems that there is still same problem.
![screen shot 2015-09-30 at 11 43 59 
am](https://cloud.githubusercontent.com/assets/1941681/10189758/acf85620-6768-11e5-8f91-72d7e6fde483.png)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-30 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-144347730
  
Okay, I checked that the logs are shown. The problem is resolved.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-30 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r40801479
  
--- Diff: 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
 ---
@@ -225,6 +220,52 @@ class ScalaShellITSuite extends FunSuite with Matchers 
with BeforeAndAfterAll {
 out.toString + stdout
   }
 
+  /**
+   * tests flink shell startup with remote cluster (starts cluster 
internally)
+   */
+  test("start flink scala shell with remote cluster") {
+
+val input: String = "val els = env.fromElements(\"a\",\"b\");\n" +
+  "els.print\nError\n:q\n"
+
+val in: BufferedReader = new BufferedReader(
+  new StringReader(
+input + "\n"))
+val out: StringWriter = new StringWriter
+
+val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+val oldOut: PrintStream = System.out
+System.setOut(new PrintStream(baos))
+val c = cluster.getOrElse(null);
+var args : Array[String] = null;
--- End diff --

Unnecessary semicolon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-30 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r40801674
  
--- Diff: 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
 ---
@@ -225,6 +220,52 @@ class ScalaShellITSuite extends FunSuite with Matchers 
with BeforeAndAfterAll {
 out.toString + stdout
   }
 
+  /**
+   * tests flink shell startup with remote cluster (starts cluster 
internally)
+   */
+  test("start flink scala shell with remote cluster") {
+
+val input: String = "val els = env.fromElements(\"a\",\"b\");\n" +
+  "els.print\nError\n:q\n"
+
+val in: BufferedReader = new BufferedReader(
+  new StringReader(
+input + "\n"))
+val out: StringWriter = new StringWriter
+
+val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+val oldOut: PrintStream = System.out
+System.setOut(new PrintStream(baos))
+val c = cluster.getOrElse(null);
+var args : Array[String] = null;
+if(c != null){
+  args = Array("remote",
+c.hostname,
+Integer.toString(c.getLeaderRPCPort))
+}
+else{
+  assert(false)
+}
+
+//start scala shell with initialized
+// buffered reader for testing
+FlinkShell.bufferedReader = in;
--- End diff --

Unnecessary semicolon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-30 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r40801356
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+
+import java.io.{StringWriter, BufferedReader}
+
+import scala.tools.nsc.Settings
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+
+import scala.tools.nsc.interpreter._
+
+
+object FlinkShell {
+
+  val LOCAL = 0;
+  val REMOTE = 1;
+  val UNDEFINED = -1;
+
+  var bufferedReader: BufferedReader = null;
+
+  def main(args: Array[String]) {
+
+// scopt, command line arguments
+case class Config(
+port: Int = -1,
+host: String = "none",
+externalJars: Option[Array[String]] = None,
+flinkShellExecutionMode : Int = UNDEFINED)
+
+val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
+  head ("Flink Scala Shell")
+
+  cmd("local") action {
+(_,c) => c.copy( host = "none", port = -1, flinkShellExecutionMode 
= LOCAL)
+  } text("starts Flink scala shell with a local Flink cluster\n") 
children(
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink\n")
+)
+
+  cmd("remote") action { (_, c) =>
+c.copy(flinkShellExecutionMode = REMOTE)
+  } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
+arg[String]("") action { (h, c) =>
+  c.copy(host = h) }
+  text("remote host name as string"),
+arg[Int]("") action { (p, c) =>
+  c.copy(port = p) }
+  text("remote port as integer\n"),
+opt[(String)] ("addclasspath") abbr("a")  
valueName("") action {
+  case (x,c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+  } text("specifies additional jars to be used in Flink")
+
+  )
+  help("help") abbr("h") text("prints this usage text\n")
+}
+
+
+// parse arguments
+parser.parse (args, Config () ) match {
+  case Some(config) =>
+startShell(config.host,
+  config.port,
+  config.flinkShellExecutionMode,
+  config.externalJars)
+
+  case _ => println("Could not parse program arguments")
+}
+  }
+
+
+  def startShell(
+  userHost : String, 
+  userPort : Int,
+  executionMode : Int,
+  externalJars : Option[Array[String]] = None): Unit ={
+
+println("Starting Flink Shell:")
+
+var cluster: LocalFlinkMiniCluster = null
+
+// either port or userhost not specified by user, create new 
minicluster
+val (host,port) = if (executionMode == LOCAL) {
+  cluster = new LocalFlinkMiniCluster(new Configuration, false)
+  cluster.start()
+  val port = cluster.getLeaderRPCPort
+  println(s"\nStarting local Flink cluster (host: localhost, port: 
$port).\n")
+  ("localhost",port)
+} else if(executionMode == UNDEFINED) {
+  println("Error: please specify execution mode:")
+  println("[local | remote  ]")
+  return
+} else if(userHost == "none" || userPort == -1){
+  println("Error:  or  not specified!")
+  return
+} else {
+  println(s"\nConnecting to Flink cluster (host: $userHost, port: 
$userPort).\n")
+  (userHost, userPort)
+}
+
+
+// custom shell
+var 

[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-30 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r40801467
  
--- Diff: 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
 ---
@@ -212,6 +214,52 @@ class ScalaShellITSuite extends FunSuite with Matchers 
with BeforeAndAfterAll {
 out.toString + stdout
   }
 
+  /**
+   * tests flink shell startup with remote cluster (starts cluster 
internally)
+   */
+  test("start flink scala shell with remote cluster") {
+val miniCluster: ForkableFlinkMiniCluster =
+  new ForkableFlinkMiniCluster(new Configuration, false)
+
+
+val input: String = "val els = env.fromElements(\"a\",\"b\");\n" +
+  "els.print\nError\n:q\n"
+
+val in: BufferedReader = new BufferedReader(
+  new StringReader(
+input + "\n"))
+val out: StringWriter = new StringWriter
+
+val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+val oldOut: PrintStream = System.out
+System.setOut(new PrintStream(baos))
+val c = cluster.getOrElse(null);
--- End diff --

Unnecessary semicolon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-30 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r40802535
  
--- Diff: 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
 ---
@@ -225,6 +220,52 @@ class ScalaShellITSuite extends FunSuite with Matchers 
with BeforeAndAfterAll {
 out.toString + stdout
   }
 
+  /**
+   * tests flink shell startup with remote cluster (starts cluster 
internally)
+   */
+  test("start flink scala shell with remote cluster") {
+
+val input: String = "val els = env.fromElements(\"a\",\"b\");\n" +
+  "els.print\nError\n:q\n"
+
+val in: BufferedReader = new BufferedReader(
+  new StringReader(
+input + "\n"))
+val out: StringWriter = new StringWriter
+
+val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+val oldOut: PrintStream = System.out
+System.setOut(new PrintStream(baos))
+val c = cluster.getOrElse(null);
+var args : Array[String] = null;
+if(c != null){
+  args = Array("remote",
+c.hostname,
+Integer.toString(c.getLeaderRPCPort))
+}
+else{
+  assert(false)
+}
+
+//start scala shell with initialized
+// buffered reader for testing
+FlinkShell.bufferedReader = in;
+FlinkShell.main(args)
+baos.flush
+
+val output: String = baos.toString
+System.setOut(oldOut)
+
+assert(output.contains("Job execution switched to status FINISHED."))
+assert(output.contains("a\nb"))
+
+assert((!output.contains("Error")))
+assert((!output.contains("ERROR")))
+assert((!output.contains("Exception")))
+assert((!output.contains("failed")))
--- End diff --

And scalatest matcher syntax would be better. For example:

```scala
output should not include "Error"
```

You can see statements like this at line 162-164 of 
`ScalaShellITSuite.scala`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-30 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r40801655
  
--- Diff: 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
 ---
@@ -225,6 +220,52 @@ class ScalaShellITSuite extends FunSuite with Matchers 
with BeforeAndAfterAll {
 out.toString + stdout
   }
 
+  /**
+   * tests flink shell startup with remote cluster (starts cluster 
internally)
+   */
+  test("start flink scala shell with remote cluster") {
+
+val input: String = "val els = env.fromElements(\"a\",\"b\");\n" +
+  "els.print\nError\n:q\n"
+
+val in: BufferedReader = new BufferedReader(
+  new StringReader(
+input + "\n"))
+val out: StringWriter = new StringWriter
+
+val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+val oldOut: PrintStream = System.out
+System.setOut(new PrintStream(baos))
+val c = cluster.getOrElse(null);
+var args : Array[String] = null;
+if(c != null){
+  args = Array("remote",
+c.hostname,
+Integer.toString(c.getLeaderRPCPort))
+}
+else{
+  assert(false)
+}
--- End diff --

Could you re-format this `if-else` block? Please add some space between 
keyword such as `if` or `else` and remove unnecessary newlines.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-30 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r40802725
  
--- Diff: 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
 ---
@@ -225,6 +220,52 @@ class ScalaShellITSuite extends FunSuite with Matchers 
with BeforeAndAfterAll {
 out.toString + stdout
   }
 
+  /**
+   * tests flink shell startup with remote cluster (starts cluster 
internally)
+   */
+  test("start flink scala shell with remote cluster") {
+
+val input: String = "val els = env.fromElements(\"a\",\"b\");\n" +
+  "els.print\nError\n:q\n"
+
+val in: BufferedReader = new BufferedReader(
+  new StringReader(
+input + "\n"))
+val out: StringWriter = new StringWriter
+
+val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+val oldOut: PrintStream = System.out
+System.setOut(new PrintStream(baos))
+val c = cluster.getOrElse(null);
+var args : Array[String] = null;
+if(c != null){
+  args = Array("remote",
+c.hostname,
+Integer.toString(c.getLeaderRPCPort))
+}
+else{
+  assert(false)
--- End diff --

Please use `fail` method instead of `assert(false)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-30 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r40801825
  
--- Diff: 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
 ---
@@ -225,6 +220,52 @@ class ScalaShellITSuite extends FunSuite with Matchers 
with BeforeAndAfterAll {
 out.toString + stdout
   }
 
+  /**
+   * tests flink shell startup with remote cluster (starts cluster 
internally)
+   */
+  test("start flink scala shell with remote cluster") {
+
+val input: String = "val els = env.fromElements(\"a\",\"b\");\n" +
+  "els.print\nError\n:q\n"
+
+val in: BufferedReader = new BufferedReader(
+  new StringReader(
+input + "\n"))
+val out: StringWriter = new StringWriter
+
+val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+val oldOut: PrintStream = System.out
+System.setOut(new PrintStream(baos))
+val c = cluster.getOrElse(null);
+var args : Array[String] = null;
+if(c != null){
+  args = Array("remote",
+c.hostname,
+Integer.toString(c.getLeaderRPCPort))
+}
+else{
+  assert(false)
+}
+
+//start scala shell with initialized
+// buffered reader for testing
+FlinkShell.bufferedReader = in;
+FlinkShell.main(args)
+baos.flush
+
+val output: String = baos.toString
+System.setOut(oldOut)
+
+assert(output.contains("Job execution switched to status FINISHED."))
+assert(output.contains("a\nb"))
+
+assert((!output.contains("Error")))
+assert((!output.contains("ERROR")))
+assert((!output.contains("Exception")))
+assert((!output.contains("failed")))
--- End diff --

These assert statements contain unnecessary parentheses.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-30 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-144432852
  
I just reviewed this PR and have a question. What is difference between 
`ScalaShellLocalStartupITCase` and `ScalaShellITSuite`? Does we need another 
integration test?

BTW, there are some style issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-30 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r40811385
  
--- Diff: 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
 ---
@@ -225,6 +220,51 @@ class ScalaShellITSuite extends FunSuite with Matchers 
with BeforeAndAfterAll {
 out.toString + stdout
   }
 
+  /**
+   * tests flink shell startup with remote cluster (starts cluster 
internally)
+   */
+  test("start flink scala shell with remote cluster") {
+
+val input: String = "val els = env.fromElements(\"a\",\"b\");\n" +
+  "els.print\nError\n:q\n"
+
+val in: BufferedReader = new BufferedReader(
+  new StringReader(
+input + "\n"))
+val out: StringWriter = new StringWriter
+
+val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+val oldOut: PrintStream = System.out
+System.setOut(new PrintStream(baos))
+val c = cluster.getOrElse(null)
+var args : Array[String] = null
+if (c != null) {
+  args = Array("remote",
+c.hostname,
+Integer.toString(c.getLeaderRPCPort))
+} else {
+  fail
--- End diff --

Oh, I forgot the usage of `fail` method. `fail(error message)` would be 
better. The error message could be the reason of failure such as "Creating 
cluster failed.".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-30 Thread nikste
Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-144456895
  
@chiwanpark changed fail to give some reasons.
This is nicer indeed :+1: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-30 Thread nikste
Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-17780
  
@chiwanpark, fixed the style issues.
`ScalaShellLocalStartupITCase` starts the shell in local mode (i.e. it will 
create its own local cluster ) being invoked by the "command line" arguments
`ScalaShellITSuite` creates a cluster before the scala shell is invoked, 
thus acting as a "remote" cluster,
here general features are tested as well as the invocation with "command 
line" arguments connecting to the "remote" cluster.
As asked by @StephanEwen above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-28 Thread nikste
Github user nikste closed the pull request at:

https://github.com/apache/flink/pull/1106


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-21 Thread nikste
Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-141910091
  
@fhueske, good point to check if the program was actually executed! 
@rmetzger fixed the directory structure as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-18 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-141401714
  
@nikste, since you need to change the pull request again, can you also fix 
the directory name of the classes

`flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-17 Thread nikste
Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-141066672
  
Should be fixed according to your comments @fhueske 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r39763329
  
--- Diff: 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
 ---
@@ -212,6 +212,49 @@ class ScalaShellITSuite extends FunSuite with Matchers 
with BeforeAndAfterAll {
 out.toString + stdout
   }
 
+  /**
+   * tests flink shell startup with remote cluster (starts cluster 
internally)
+   */
+  test("start flink scala shell with remote cluster") {
+
+val input: String = "val els = env.fromElements(\"a\",\"b\");\n" +
--- End diff --

Can you also add a check, that the program was actually executed for 
example by checking that the correct output was produced?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r39763373
  
--- Diff: 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io._
+
+import org.junit.runner.RunWith
+import org.scalatest.{Matchers, FunSuite}
+import org.scalatest.junit.JUnitRunner
+
+
+@RunWith(classOf[JUnitRunner])
+class ScalaShellLocalStartupITCase extends FunSuite with Matchers {
+
+/**
+ * tests flink shell with local setup through startup script in bin 
folder
+ */
+test("start flink scala shell with local cluster") {
+
+  val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + 
"els.print\nError\n:q\n"
--- End diff --

Please check that the program was actually executed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-17 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-141131210
  
Thanks for the update @nikste! I have only one minor thing to add.
After that it's good to merge, IMO.
Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r39654142
  
--- Diff: 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
 ---
@@ -212,6 +214,52 @@ class ScalaShellITSuite extends FunSuite with Matchers 
with BeforeAndAfterAll {
 out.toString + stdout
   }
 
+  /**
+   * tests flink shell startup with remote cluster (starts cluster 
internally)
+   */
+  test("start flink scala shell with remote cluster") {
+val miniCluster: ForkableFlinkMiniCluster =
--- End diff --

You are creating a new ForkableFlinkMiniCluster here but not using it later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r39654250
  
--- Diff: 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala
 ---
@@ -212,6 +214,52 @@ class ScalaShellITSuite extends FunSuite with Matchers 
with BeforeAndAfterAll {
 out.toString + stdout
   }
 
+  /**
+   * tests flink shell startup with remote cluster (starts cluster 
internally)
+   */
+  test("start flink scala shell with remote cluster") {
+val miniCluster: ForkableFlinkMiniCluster =
+  new ForkableFlinkMiniCluster(new Configuration, false)
+
+
+val input: String = "val els = env.fromElements(\"a\",\"b\");\n" +
+  "els.print\nError\n:q\n"
+
+val in: BufferedReader = new BufferedReader(
+  new StringReader(
+input + "\n"))
+val out: StringWriter = new StringWriter
+
+val baos: ByteArrayOutputStream = new ByteArrayOutputStream
+val oldOut: PrintStream = System.out
+System.setOut(new PrintStream(baos))
+val c = cluster.getOrElse(null);
--- End diff --

You are not connecting to the miniCluster you started before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r39653423
  
--- Diff: 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupIT.scala
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io._
+
+import org.junit.runner.RunWith
+import org.scalatest.{Matchers, FunSuite}
+import org.scalatest.junit.JUnitRunner
+
+
+/**
+ * Created by nikste on 9/16/15.
+ */
+@RunWith(classOf[JUnitRunner])
+class ScalaShellLocalStartupIT extends FunSuite with Matchers {
--- End diff --

We have the convention that integration test classes end with `ITCase` or 
`ITSuite`. Can you rename this class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1106#discussion_r39652554
  
--- Diff: 
flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupIT.scala
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import java.io._
+
+import org.junit.runner.RunWith
+import org.scalatest.{Matchers, FunSuite}
+import org.scalatest.junit.JUnitRunner
+
+
+/**
+ * Created by nikste on 9/16/15.
--- End diff --

Please remove this comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-14 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-140239647
  
The tests use the shell scripts (`.sh` or `.bat`) to start the Scala Shell 
and rely on finding these scripts in the `flink-dist/target` folder. I think 
this is a bit fragile. Why not directly using the `FlinkShell.scala` class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-09 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1106#issuecomment-138863295
  
Looks good. Is it possible that we cover both cases with tests?

1.  Start the shell in local mode, submit some commands.
2.  Start a `ForkableFlinkMiniCluster`, and start the shell in mode `remote 
"localhost" cluster.getJobManagerRpcPort()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2613] Print usage information for Scala...

2015-09-08 Thread nikste
GitHub user nikste opened a pull request:

https://github.com/apache/flink/pull/1106

[FLINK-2613] Print usage information for Scala Shell

changed startup of scala shell

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/nikste/flink FLINK-2613-enhanced-help-message

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1106.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1106


commit 8aa7e99fed606b3731fbd7b2ef7f14f4d72801fe
Author: Nikolaas Steenbergen 
Date:   2015-09-07T09:36:59Z

[FLINK-2613] Print usage information for Scala Shell, changed startup of 
scala shell




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---