[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-03-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1500


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-03-24 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1500#issuecomment-200804302
  
Oh, thanks for pointing it @mxm. Then I'll merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-03-24 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1500#issuecomment-200770572
  
Thanks @chiwanpark! As far as I can see, there are no overlapping classes 
between the two pull requests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-03-22 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1500#issuecomment-199796720
  
Hi all, I would like to merge this. If there is no objection and travis 
passes, I'll merge this to master in few days.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-02-03 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1500#issuecomment-179160128
  
Changes look good. I had only one minor comment left. After addressing this 
comment, I think we can merge the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-02-03 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1500#issuecomment-179160309
  
@tillrohrmann Thanks for review. I'll address the comment in today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-02-03 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1500#discussion_r51705046
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java ---
@@ -417,13 +417,9 @@ public int run(String[] args) {
String jobManagerAddress = 
yarnCluster.getJobManagerAddress().getAddress().getHostAddress() + ":" + 
yarnCluster.getJobManagerAddress().getPort();
System.out.println("Flink JobManager is now running on 
" + jobManagerAddress);
System.out.println("JobManager Web Interface: " + 
yarnCluster.getWebInterfaceURL());
-   // file that we write into the conf/ dir containing the 
jobManager address and the dop.
-
-   String defaultPropertiesFileLocation = 
System.getProperty("java.io.tmpdir");
-   String currentUser = System.getProperty("user.name");
-   String propertiesFileLocation = 
yarnCluster.getFlinkConfiguration().getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION,
 defaultPropertiesFileLocation);
 
-   File yarnPropertiesFile = new 
File(propertiesFileLocation + File.separator + CliFrontend.YARN_PROPERTIES_FILE 
+ currentUser);
+   // file that we write into the conf/ dir containing the 
jobManager address and the dop.
+   File yarnPropertiesFile = new 
File(getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration()));
--- End diff --

The yarnPropertiesFile path construction is also done in `CliFrontend:173`. 
This should also be changed to use the new `getYarnPropertiesLocation` method. 
Maybe moving this method to `CliFrontend` makes sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-02-03 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1500#issuecomment-179182603
  
I've addressed @tillrohrmann comment. After running CI, we can merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-02-03 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1500#issuecomment-179643330
  
Failing test seems not related to this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-01-29 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1500#issuecomment-176839108
  
@tillrohrmann do you have any comments on @chiwanpark's updates?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-01-27 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1500#issuecomment-175588417
  
@tillrohrmann Thanks for review! I have addressed your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-01-27 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1500#issuecomment-175600967
  
About the type of number of containers for TaskManagers, it should be 
optional because the user can the Scala Shell without specifying the number 
when the user wants to connect already deployed YARN cluster. 
(`bin/start-scala-shell.sh yarn`)

The number is required in only case of deploying YARN cluster for the shell.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-01-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1500#discussion_r49995125
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -61,99 +73,217 @@ object FlinkShell {
 )
 
   cmd("remote") action { (_, c) =>
-c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
+c.copy(executionMode = ExecutionMode.REMOTE)
   } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
 arg[String]("") action { (h, c) =>
-  c.copy(host = h) }
+  c.copy(host = Some(h)) }
   text("remote host name as string"),
 arg[Int]("") action { (p, c) =>
-  c.copy(port = p) }
+  c.copy(port = Some(p)) }
   text("remote port as integer\n"),
 opt[(String)]("addclasspath") abbr("a") valueName("") 
action {
   case (x, c) =>
 val xArray = x.split(":")
 c.copy(externalJars = Option(xArray))
-  } text("specifies additional jars to be used in Flink")
+} text ("specifies additional jars to be used in Flink")
   )
-  help("help") abbr("h") text("prints this usage text\n")
+
+  cmd("yarn") action {
+(_, c) => c.copy(executionMode = ExecutionMode.YARN, yarnConfig = 
None)
+  } text ("starts Flink scala shell connecting to a yarn cluster\n") 
children(
+opt[Int]("container") abbr ("n") valueName ("arg") action {
+  (x, c) =>
+c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(containers = 
Some(x
+} text ("Number of YARN container to allocate (= Number of Task 
Managers)"),
+opt[Int]("jobManagerMemory") abbr ("jm") valueName ("arg") action {
+  (x, c) =>
+c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(jobManagerMemory = Some(x
+} text ("Memory for JobManager Container [in MB]"),
+opt[String]("name") abbr ("nm") action {
+  (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(name 
= Some(x
+} text ("Set a custom name for the application on YARN"),
+opt[String]("queue") abbr ("qu") valueName ("") action {
+  (x, c) => c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(queue = Some(x
+} text ("Specify YARN queue"),
+opt[Int]("slots") abbr ("s") valueName ("") action {
+  (x, c) => c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(slots = Some(x
+} text ("Number of slots per TaskManager"),
+opt[Int]("taskManagerMemory") abbr ("tm") valueName ("") 
action {
+  (x, c) =>
+c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(taskManagerMemory = Some(x
+} text ("Memory per TaskManager Container [in MB]"),
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x, c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+} text("specifies additional jars to be used in Flink\n")
+  )
+
+  help("help") abbr ("h") text ("prints this usage text\n")
 }
 
 // parse arguments
-parser.parse (args, Config()) match {
-  case Some(config) =>
-startShell(config.host,
-  config.port,
-  config.flinkShellExecutionMode,
-  config.externalJars)
-
-  case _ => System.out.println("Could not parse program arguments")
+parser.parse(args, Config()) match {
+  case Some(config) => startShell(config)
+  case _ => println("Could not parse program arguments")
 }
   }
 
+  def fetchConnectionInfo(
+config: Config
+  ): (String, Int, Option[Either[FlinkMiniCluster, 
AbstractFlinkYarnCluster]]) = {
+config.executionMode match {
+  case ExecutionMode.LOCAL => // Local mode
+val config = new Configuration()
+config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
 
-  def startShell(
-  userHost: String,
-  userPort: Int,
-  executionMode: ExecutionMode.Value,
-  externalJars: Option[Array[String]] = None): Unit ={
-
-System.out.println("Starting Flink Shell:")
-
-// either port or userhost not specified by user, create new 
minicluster
-val (host: String, port: Int, cluster: Option[LocalFlinkMiniCluster]) =
-  executionMode match {
-case ExecutionMode.LOCAL =>
-  val config = new Configuration()
-  config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
-  val miniCluster = new LocalFlinkMiniCluster(config, false)
- 

[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-01-18 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1500#issuecomment-172526377
  
Thanks for your work @chiwanpark. On the whole your changes look good. I 
had some minor comments.

I also think that this won't work if the YARN cluster was started with HA 
and the first master has died. The reason is that the `.yarn-properties` file 
is read to connect to the `JobManager`. But this file is never updated in case 
of a new leader. However, this is also a problem which will occur with a normal 
Flink job right now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-01-18 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1500#issuecomment-172531939
  
I have to correct myself. This should not be an issue if you start a Yarn 
cluster in HA and then use `bin/flink run ...`, because a 
`LeaderRetrievalService` will be instantiated which will take care of this.

However, it should be a problem here. The `FlinkILoop` instantiates a 
`ScalaShellRemoteEnvironment` which is a sub class of `RemoteEnvironment`. 
Since `RemoteEnvironment` is called with `null` for the client configuration, 
it cannot find a possible ZooKeeper instance where the `JobManager` is 
registered in the HA case. I think you have to forward the `Configuration` 
object created from `flink-conf.yaml` to the `RemoteEnvironment` in order to 
make it work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-01-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1500#discussion_r49995630
  
--- Diff: flink-staging/flink-scala-shell/start-script/start-scala-shell.sh 
---
@@ -75,11 +75,20 @@ do
 fi
 done
 
+log_setting=""
+
+if [[ $1 = "yarn" ]]
+then

+FLINK_CLASSPATH=$FLINK_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR
+log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-yarn-session-$HOSTNAME.log
--- End diff --

Why calling the log file like it is called when you start a dedicated yarn 
session via `yarn-session.sh`? Isn't this a bit confusing? Why not calling it 
`flink-$FLINK_IDENT_STRING-scala-shell-yarn-$HOSTNAME.log?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-01-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1500#discussion_r49992667
  
--- Diff: docs/apis/scala_shell.md ---
@@ -69,6 +69,18 @@ Scala-Flink> env.execute("MyProgram")
 
 The Flink Shell comes with command history and autocompletion.
 
+## Scala Shell with Flink on YARN
+
+The Scala shell can connect Flink cluster on YARN. To connect deployed 
Flink cluster on YARN, use following command:
+
+~~~bash
+bin/start-scala-shell.sh yarn
+~~~
+
+The shell reads the connection information of the deployed Flink cluster 
from a `.yarn-properties` file in temporary directory. If there is no deployed 
Flink cluster on YARN, the shell prints error message.
--- End diff --

prints *an* error message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-01-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1500#discussion_r49993429
  
--- Diff: docs/apis/scala_shell.md ---
@@ -69,6 +69,18 @@ Scala-Flink> env.execute("MyProgram")
 
 The Flink Shell comes with command history and autocompletion.
 
+## Scala Shell with Flink on YARN
+
+The Scala shell can connect Flink cluster on YARN. To connect deployed 
Flink cluster on YARN, use following command:
+
+~~~bash
+bin/start-scala-shell.sh yarn
+~~~
+
+The shell reads the connection information of the deployed Flink cluster 
from a `.yarn-properties` file in temporary directory. If there is no deployed 
Flink cluster on YARN, the shell prints error message.
+
+The shell can deploy Flink cluster to YARN for the shell only. If you add 
an parameter `-n ` which means a number of containers, the shell deploy a 
new Flink cluster on YARN and connect the cluster. You can also specify options 
for YARN cluster such as memory for JobManager, name of YARN application, etc.. 
--- End diff --

The shell deploy*s* ... and connect*s to* the cluster.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-01-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1500#discussion_r49993393
  
--- Diff: docs/apis/scala_shell.md ---
@@ -69,6 +69,18 @@ Scala-Flink> env.execute("MyProgram")
 
 The Flink Shell comes with command history and autocompletion.
 
+## Scala Shell with Flink on YARN
+
+The Scala shell can connect Flink cluster on YARN. To connect deployed 
Flink cluster on YARN, use following command:
+
+~~~bash
+bin/start-scala-shell.sh yarn
+~~~
+
+The shell reads the connection information of the deployed Flink cluster 
from a `.yarn-properties` file in temporary directory. If there is no deployed 
Flink cluster on YARN, the shell prints error message.
+
+The shell can deploy Flink cluster to YARN for the shell only. If you add 
an parameter `-n ` which means a number of containers, the shell deploy a 
new Flink cluster on YARN and connect the cluster. You can also specify options 
for YARN cluster such as memory for JobManager, name of YARN application, etc.. 
--- End diff --

The number of YARN containers can be controlled by the parameter `-n 
`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-01-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1500#discussion_r49995021
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -61,99 +73,217 @@ object FlinkShell {
 )
 
   cmd("remote") action { (_, c) =>
-c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
+c.copy(executionMode = ExecutionMode.REMOTE)
   } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
 arg[String]("") action { (h, c) =>
-  c.copy(host = h) }
+  c.copy(host = Some(h)) }
   text("remote host name as string"),
 arg[Int]("") action { (p, c) =>
-  c.copy(port = p) }
+  c.copy(port = Some(p)) }
   text("remote port as integer\n"),
 opt[(String)]("addclasspath") abbr("a") valueName("") 
action {
   case (x, c) =>
 val xArray = x.split(":")
 c.copy(externalJars = Option(xArray))
-  } text("specifies additional jars to be used in Flink")
+} text ("specifies additional jars to be used in Flink")
   )
-  help("help") abbr("h") text("prints this usage text\n")
+
+  cmd("yarn") action {
+(_, c) => c.copy(executionMode = ExecutionMode.YARN, yarnConfig = 
None)
+  } text ("starts Flink scala shell connecting to a yarn cluster\n") 
children(
+opt[Int]("container") abbr ("n") valueName ("arg") action {
+  (x, c) =>
+c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(containers = 
Some(x
+} text ("Number of YARN container to allocate (= Number of Task 
Managers)"),
+opt[Int]("jobManagerMemory") abbr ("jm") valueName ("arg") action {
+  (x, c) =>
+c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(jobManagerMemory = Some(x
+} text ("Memory for JobManager Container [in MB]"),
+opt[String]("name") abbr ("nm") action {
+  (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(name 
= Some(x
+} text ("Set a custom name for the application on YARN"),
+opt[String]("queue") abbr ("qu") valueName ("") action {
+  (x, c) => c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(queue = Some(x
+} text ("Specify YARN queue"),
+opt[Int]("slots") abbr ("s") valueName ("") action {
+  (x, c) => c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(slots = Some(x
+} text ("Number of slots per TaskManager"),
+opt[Int]("taskManagerMemory") abbr ("tm") valueName ("") 
action {
+  (x, c) =>
+c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(taskManagerMemory = Some(x
+} text ("Memory per TaskManager Container [in MB]"),
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x, c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+} text("specifies additional jars to be used in Flink\n")
+  )
+
+  help("help") abbr ("h") text ("prints this usage text\n")
 }
 
 // parse arguments
-parser.parse (args, Config()) match {
-  case Some(config) =>
-startShell(config.host,
-  config.port,
-  config.flinkShellExecutionMode,
-  config.externalJars)
-
-  case _ => System.out.println("Could not parse program arguments")
+parser.parse(args, Config()) match {
+  case Some(config) => startShell(config)
+  case _ => println("Could not parse program arguments")
 }
   }
 
+  def fetchConnectionInfo(
+config: Config
+  ): (String, Int, Option[Either[FlinkMiniCluster, 
AbstractFlinkYarnCluster]]) = {
+config.executionMode match {
+  case ExecutionMode.LOCAL => // Local mode
+val config = new Configuration()
+config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
 
-  def startShell(
-  userHost: String,
-  userPort: Int,
-  executionMode: ExecutionMode.Value,
-  externalJars: Option[Array[String]] = None): Unit ={
-
-System.out.println("Starting Flink Shell:")
-
-// either port or userhost not specified by user, create new 
minicluster
-val (host: String, port: Int, cluster: Option[LocalFlinkMiniCluster]) =
-  executionMode match {
-case ExecutionMode.LOCAL =>
-  val config = new Configuration()
-  config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
-  val miniCluster = new LocalFlinkMiniCluster(config, false)
- 

[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-01-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1500#discussion_r49993273
  
--- Diff: docs/apis/scala_shell.md ---
@@ -69,6 +69,18 @@ Scala-Flink> env.execute("MyProgram")
 
 The Flink Shell comes with command history and autocompletion.
 
+## Scala Shell with Flink on YARN
+
+The Scala shell can connect Flink cluster on YARN. To connect deployed 
Flink cluster on YARN, use following command:
+
+~~~bash
+bin/start-scala-shell.sh yarn
+~~~
+
+The shell reads the connection information of the deployed Flink cluster 
from a `.yarn-properties` file in temporary directory. If there is no deployed 
Flink cluster on YARN, the shell prints error message.
+
+The shell can deploy Flink cluster to YARN for the shell only. If you add 
an parameter `-n ` which means a number of containers, the shell deploy a 
new Flink cluster on YARN and connect the cluster. You can also specify options 
for YARN cluster such as memory for JobManager, name of YARN application, etc.. 
--- End diff --

deploy *a* Flink cluster to YARN, *which is used exclusively by the shell*.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-01-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1500#discussion_r49993179
  
--- Diff: docs/apis/scala_shell.md ---
@@ -69,6 +69,18 @@ Scala-Flink> env.execute("MyProgram")
 
 The Flink Shell comes with command history and autocompletion.
 
+## Scala Shell with Flink on YARN
+
+The Scala shell can connect Flink cluster on YARN. To connect deployed 
Flink cluster on YARN, use following command:
+
+~~~bash
+bin/start-scala-shell.sh yarn
+~~~
+
+The shell reads the connection information of the deployed Flink cluster 
from a `.yarn-properties` file in temporary directory. If there is no deployed 
Flink cluster on YARN, the shell prints error message.
--- End diff --

from *the* `.yarn-properties-USER` file*, which is created in the 
configured `yarn.properties-file.location` directory or the temporary 
directory*.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-01-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1500#discussion_r49993897
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -61,99 +73,217 @@ object FlinkShell {
 )
 
   cmd("remote") action { (_, c) =>
-c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
+c.copy(executionMode = ExecutionMode.REMOTE)
   } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
 arg[String]("") action { (h, c) =>
-  c.copy(host = h) }
+  c.copy(host = Some(h)) }
   text("remote host name as string"),
 arg[Int]("") action { (p, c) =>
-  c.copy(port = p) }
+  c.copy(port = Some(p)) }
   text("remote port as integer\n"),
 opt[(String)]("addclasspath") abbr("a") valueName("") 
action {
   case (x, c) =>
 val xArray = x.split(":")
 c.copy(externalJars = Option(xArray))
-  } text("specifies additional jars to be used in Flink")
+} text ("specifies additional jars to be used in Flink")
   )
-  help("help") abbr("h") text("prints this usage text\n")
+
+  cmd("yarn") action {
+(_, c) => c.copy(executionMode = ExecutionMode.YARN, yarnConfig = 
None)
+  } text ("starts Flink scala shell connecting to a yarn cluster\n") 
children(
+opt[Int]("container") abbr ("n") valueName ("arg") action {
+  (x, c) =>
+c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(containers = 
Some(x
+} text ("Number of YARN container to allocate (= Number of Task 
Managers)"),
+opt[Int]("jobManagerMemory") abbr ("jm") valueName ("arg") action {
+  (x, c) =>
+c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(jobManagerMemory = Some(x
+} text ("Memory for JobManager Container [in MB]"),
--- End diff --

container


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-01-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1500#discussion_r49993916
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -61,99 +73,217 @@ object FlinkShell {
 )
 
   cmd("remote") action { (_, c) =>
-c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
+c.copy(executionMode = ExecutionMode.REMOTE)
   } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
 arg[String]("") action { (h, c) =>
-  c.copy(host = h) }
+  c.copy(host = Some(h)) }
   text("remote host name as string"),
 arg[Int]("") action { (p, c) =>
-  c.copy(port = p) }
+  c.copy(port = Some(p)) }
   text("remote port as integer\n"),
 opt[(String)]("addclasspath") abbr("a") valueName("") 
action {
   case (x, c) =>
 val xArray = x.split(":")
 c.copy(externalJars = Option(xArray))
-  } text("specifies additional jars to be used in Flink")
+} text ("specifies additional jars to be used in Flink")
   )
-  help("help") abbr("h") text("prints this usage text\n")
+
+  cmd("yarn") action {
+(_, c) => c.copy(executionMode = ExecutionMode.YARN, yarnConfig = 
None)
+  } text ("starts Flink scala shell connecting to a yarn cluster\n") 
children(
+opt[Int]("container") abbr ("n") valueName ("arg") action {
+  (x, c) =>
+c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(containers = 
Some(x
+} text ("Number of YARN container to allocate (= Number of Task 
Managers)"),
+opt[Int]("jobManagerMemory") abbr ("jm") valueName ("arg") action {
+  (x, c) =>
+c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(jobManagerMemory = Some(x
+} text ("Memory for JobManager Container [in MB]"),
+opt[String]("name") abbr ("nm") action {
+  (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(name 
= Some(x
+} text ("Set a custom name for the application on YARN"),
+opt[String]("queue") abbr ("qu") valueName ("") action {
+  (x, c) => c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(queue = Some(x
+} text ("Specify YARN queue"),
+opt[Int]("slots") abbr ("s") valueName ("") action {
+  (x, c) => c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(slots = Some(x
+} text ("Number of slots per TaskManager"),
+opt[Int]("taskManagerMemory") abbr ("tm") valueName ("") 
action {
+  (x, c) =>
+c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(taskManagerMemory = Some(x
+} text ("Memory per TaskManager Container [in MB]"),
--- End diff --

container


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-01-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1500#discussion_r49993833
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -61,99 +73,217 @@ object FlinkShell {
 )
 
   cmd("remote") action { (_, c) =>
-c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
+c.copy(executionMode = ExecutionMode.REMOTE)
   } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
 arg[String]("") action { (h, c) =>
-  c.copy(host = h) }
+  c.copy(host = Some(h)) }
   text("remote host name as string"),
 arg[Int]("") action { (p, c) =>
-  c.copy(port = p) }
+  c.copy(port = Some(p)) }
   text("remote port as integer\n"),
 opt[(String)]("addclasspath") abbr("a") valueName("") 
action {
   case (x, c) =>
 val xArray = x.split(":")
 c.copy(externalJars = Option(xArray))
-  } text("specifies additional jars to be used in Flink")
+} text ("specifies additional jars to be used in Flink")
   )
-  help("help") abbr("h") text("prints this usage text\n")
+
+  cmd("yarn") action {
+(_, c) => c.copy(executionMode = ExecutionMode.YARN, yarnConfig = 
None)
+  } text ("starts Flink scala shell connecting to a yarn cluster\n") 
children(
+opt[Int]("container") abbr ("n") valueName ("arg") action {
+  (x, c) =>
+c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(containers = 
Some(x
+} text ("Number of YARN container to allocate (= Number of Task 
Managers)"),
--- End diff --

TaskManagers


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-01-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1500#discussion_r49993975
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -61,99 +73,217 @@ object FlinkShell {
 )
 
   cmd("remote") action { (_, c) =>
-c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
+c.copy(executionMode = ExecutionMode.REMOTE)
   } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
 arg[String]("") action { (h, c) =>
-  c.copy(host = h) }
+  c.copy(host = Some(h)) }
   text("remote host name as string"),
 arg[Int]("") action { (p, c) =>
-  c.copy(port = p) }
+  c.copy(port = Some(p)) }
   text("remote port as integer\n"),
 opt[(String)]("addclasspath") abbr("a") valueName("") 
action {
   case (x, c) =>
 val xArray = x.split(":")
 c.copy(externalJars = Option(xArray))
-  } text("specifies additional jars to be used in Flink")
+} text ("specifies additional jars to be used in Flink")
   )
-  help("help") abbr("h") text("prints this usage text\n")
+
+  cmd("yarn") action {
+(_, c) => c.copy(executionMode = ExecutionMode.YARN, yarnConfig = 
None)
+  } text ("starts Flink scala shell connecting to a yarn cluster\n") 
children(
+opt[Int]("container") abbr ("n") valueName ("arg") action {
+  (x, c) =>
+c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(containers = 
Some(x
+} text ("Number of YARN container to allocate (= Number of Task 
Managers)"),
+opt[Int]("jobManagerMemory") abbr ("jm") valueName ("arg") action {
+  (x, c) =>
+c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(jobManagerMemory = Some(x
+} text ("Memory for JobManager Container [in MB]"),
+opt[String]("name") abbr ("nm") action {
+  (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(name 
= Some(x
+} text ("Set a custom name for the application on YARN"),
+opt[String]("queue") abbr ("qu") valueName ("") action {
+  (x, c) => c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(queue = Some(x
+} text ("Specify YARN queue"),
+opt[Int]("slots") abbr ("s") valueName ("") action {
+  (x, c) => c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(slots = Some(x
+} text ("Number of slots per TaskManager"),
+opt[Int]("taskManagerMemory") abbr ("tm") valueName ("") 
action {
+  (x, c) =>
+c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(taskManagerMemory = Some(x
+} text ("Memory per TaskManager Container [in MB]"),
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x, c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+} text("specifies additional jars to be used in Flink\n")
--- End diff --

Capital letter for starting text. Do we need the `\n` at the end?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-01-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1500#discussion_r49994742
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -61,99 +73,217 @@ object FlinkShell {
 )
 
   cmd("remote") action { (_, c) =>
-c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
+c.copy(executionMode = ExecutionMode.REMOTE)
   } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
 arg[String]("") action { (h, c) =>
-  c.copy(host = h) }
+  c.copy(host = Some(h)) }
   text("remote host name as string"),
 arg[Int]("") action { (p, c) =>
-  c.copy(port = p) }
+  c.copy(port = Some(p)) }
   text("remote port as integer\n"),
 opt[(String)]("addclasspath") abbr("a") valueName("") 
action {
   case (x, c) =>
 val xArray = x.split(":")
 c.copy(externalJars = Option(xArray))
-  } text("specifies additional jars to be used in Flink")
+} text ("specifies additional jars to be used in Flink")
   )
-  help("help") abbr("h") text("prints this usage text\n")
+
+  cmd("yarn") action {
+(_, c) => c.copy(executionMode = ExecutionMode.YARN, yarnConfig = 
None)
+  } text ("starts Flink scala shell connecting to a yarn cluster\n") 
children(
+opt[Int]("container") abbr ("n") valueName ("arg") action {
+  (x, c) =>
+c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(containers = 
Some(x
+} text ("Number of YARN container to allocate (= Number of Task 
Managers)"),
+opt[Int]("jobManagerMemory") abbr ("jm") valueName ("arg") action {
+  (x, c) =>
+c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(jobManagerMemory = Some(x
+} text ("Memory for JobManager Container [in MB]"),
+opt[String]("name") abbr ("nm") action {
+  (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(name 
= Some(x
+} text ("Set a custom name for the application on YARN"),
+opt[String]("queue") abbr ("qu") valueName ("") action {
+  (x, c) => c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(queue = Some(x
+} text ("Specify YARN queue"),
+opt[Int]("slots") abbr ("s") valueName ("") action {
+  (x, c) => c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(slots = Some(x
+} text ("Number of slots per TaskManager"),
+opt[Int]("taskManagerMemory") abbr ("tm") valueName ("") 
action {
+  (x, c) =>
+c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(taskManagerMemory = Some(x
+} text ("Memory per TaskManager Container [in MB]"),
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x, c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+} text("specifies additional jars to be used in Flink\n")
+  )
+
+  help("help") abbr ("h") text ("prints this usage text\n")
 }
 
 // parse arguments
-parser.parse (args, Config()) match {
-  case Some(config) =>
-startShell(config.host,
-  config.port,
-  config.flinkShellExecutionMode,
-  config.externalJars)
-
-  case _ => System.out.println("Could not parse program arguments")
+parser.parse(args, Config()) match {
+  case Some(config) => startShell(config)
+  case _ => println("Could not parse program arguments")
 }
   }
 
+  def fetchConnectionInfo(
+config: Config
+  ): (String, Int, Option[Either[FlinkMiniCluster, 
AbstractFlinkYarnCluster]]) = {
+config.executionMode match {
+  case ExecutionMode.LOCAL => // Local mode
+val config = new Configuration()
+config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
 
-  def startShell(
-  userHost: String,
-  userPort: Int,
-  executionMode: ExecutionMode.Value,
-  externalJars: Option[Array[String]] = None): Unit ={
-
-System.out.println("Starting Flink Shell:")
-
-// either port or userhost not specified by user, create new 
minicluster
-val (host: String, port: Int, cluster: Option[LocalFlinkMiniCluster]) =
-  executionMode match {
-case ExecutionMode.LOCAL =>
-  val config = new Configuration()
-  config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
-  val miniCluster = new LocalFlinkMiniCluster(config, false)
- 

[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-01-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1500#discussion_r49994064
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -61,99 +73,217 @@ object FlinkShell {
 )
 
   cmd("remote") action { (_, c) =>
-c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
+c.copy(executionMode = ExecutionMode.REMOTE)
   } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
 arg[String]("") action { (h, c) =>
-  c.copy(host = h) }
+  c.copy(host = Some(h)) }
   text("remote host name as string"),
 arg[Int]("") action { (p, c) =>
-  c.copy(port = p) }
+  c.copy(port = Some(p)) }
   text("remote port as integer\n"),
 opt[(String)]("addclasspath") abbr("a") valueName("") 
action {
   case (x, c) =>
 val xArray = x.split(":")
 c.copy(externalJars = Option(xArray))
-  } text("specifies additional jars to be used in Flink")
+} text ("specifies additional jars to be used in Flink")
   )
-  help("help") abbr("h") text("prints this usage text\n")
+
+  cmd("yarn") action {
+(_, c) => c.copy(executionMode = ExecutionMode.YARN, yarnConfig = 
None)
+  } text ("starts Flink scala shell connecting to a yarn cluster\n") 
children(
+opt[Int]("container") abbr ("n") valueName ("arg") action {
+  (x, c) =>
+c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(containers = 
Some(x
+} text ("Number of YARN container to allocate (= Number of Task 
Managers)"),
+opt[Int]("jobManagerMemory") abbr ("jm") valueName ("arg") action {
+  (x, c) =>
+c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(jobManagerMemory = Some(x
+} text ("Memory for JobManager Container [in MB]"),
+opt[String]("name") abbr ("nm") action {
+  (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(name 
= Some(x
+} text ("Set a custom name for the application on YARN"),
+opt[String]("queue") abbr ("qu") valueName ("") action {
+  (x, c) => c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(queue = Some(x
+} text ("Specify YARN queue"),
+opt[Int]("slots") abbr ("s") valueName ("") action {
+  (x, c) => c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(slots = Some(x
+} text ("Number of slots per TaskManager"),
+opt[Int]("taskManagerMemory") abbr ("tm") valueName ("") 
action {
+  (x, c) =>
+c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(taskManagerMemory = Some(x
+} text ("Memory per TaskManager Container [in MB]"),
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x, c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+} text("specifies additional jars to be used in Flink\n")
+  )
+
+  help("help") abbr ("h") text ("prints this usage text\n")
--- End diff --

Do we need the `\n`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-01-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1500#discussion_r49994051
  
--- Diff: 
flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 ---
@@ -61,99 +73,217 @@ object FlinkShell {
 )
 
   cmd("remote") action { (_, c) =>
-c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
+c.copy(executionMode = ExecutionMode.REMOTE)
   } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
 arg[String]("") action { (h, c) =>
-  c.copy(host = h) }
+  c.copy(host = Some(h)) }
   text("remote host name as string"),
 arg[Int]("") action { (p, c) =>
-  c.copy(port = p) }
+  c.copy(port = Some(p)) }
   text("remote port as integer\n"),
 opt[(String)]("addclasspath") abbr("a") valueName("") 
action {
   case (x, c) =>
 val xArray = x.split(":")
 c.copy(externalJars = Option(xArray))
-  } text("specifies additional jars to be used in Flink")
+} text ("specifies additional jars to be used in Flink")
   )
-  help("help") abbr("h") text("prints this usage text\n")
+
+  cmd("yarn") action {
+(_, c) => c.copy(executionMode = ExecutionMode.YARN, yarnConfig = 
None)
+  } text ("starts Flink scala shell connecting to a yarn cluster\n") 
children(
+opt[Int]("container") abbr ("n") valueName ("arg") action {
+  (x, c) =>
+c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(containers = 
Some(x
+} text ("Number of YARN container to allocate (= Number of Task 
Managers)"),
+opt[Int]("jobManagerMemory") abbr ("jm") valueName ("arg") action {
+  (x, c) =>
+c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(jobManagerMemory = Some(x
+} text ("Memory for JobManager Container [in MB]"),
+opt[String]("name") abbr ("nm") action {
+  (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(name 
= Some(x
+} text ("Set a custom name for the application on YARN"),
+opt[String]("queue") abbr ("qu") valueName ("") action {
+  (x, c) => c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(queue = Some(x
+} text ("Specify YARN queue"),
+opt[Int]("slots") abbr ("s") valueName ("") action {
+  (x, c) => c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(slots = Some(x
+} text ("Number of slots per TaskManager"),
+opt[Int]("taskManagerMemory") abbr ("tm") valueName ("") 
action {
+  (x, c) =>
+c.copy(yarnConfig = 
Some(ensureYarnConfig(c).copy(taskManagerMemory = Some(x
+} text ("Memory per TaskManager Container [in MB]"),
+opt[(String)] ("addclasspath") abbr("a") 
valueName("") action {
+  case (x, c) =>
+val xArray = x.split(":")
+c.copy(externalJars = Option(xArray))
+} text("specifies additional jars to be used in Flink\n")
+  )
+
+  help("help") abbr ("h") text ("prints this usage text\n")
--- End diff --

All other descriptions start with a capital letter. Should be uniformly 
handled.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t...

2016-01-12 Thread chiwanpark
GitHub user chiwanpark opened a pull request:

https://github.com/apache/flink/pull/1500

[FLINK-2935] [scala-shell] Allow Scala shell to connect Flink cluster on 
YARN

Please check a [JIRA 
issue](https://issues.apache.org/jira/browse/FLINK-2935) related to this PR. 
This PR is tested with Hadoop YARN 2.6.2. It works on Scala 2.10 and 2.11 both.

Because there is duplicated method `parseHostPortAddress` in `CliFrontend` 
and `RemoteExecutor` classes, I moved the method to a new class `ClientUtils`. 
This PR contains also some refactoring of `FlinkShell` class.

Any comments are welcome.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chiwanpark/flink FLINK-2935

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1500.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1500


commit 4e57f68133f7b59b928c74cf165d6920f8c4d11c
Author: Chiwan Park 
Date:   2015-12-07T12:05:03Z

[FLINK-2935] [scala-shell] Allow Scala shell to connect Flink cluster on 
YARN

  - Remove duplicated parseHostPortAddress method (Move it to ClientUtils 
class)
  - Refactor FlinkShell class




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---