[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-26 Thread rmetzger
Github user rmetzger closed the pull request at:

https://github.com/apache/flink/pull/292


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-21 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23292258
  
--- Diff: 
flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala ---
@@ -41,9 +41,9 @@ object ApplicationMaster{
   val MODIFIED_CONF_FILE = flink-conf-modified.yaml
 
   def main(args: Array[String]): Unit ={
-val yarnClientUsername = System.getenv(Client.ENV_CLIENT_USERNAME)
-LOG.info(sYARN daemon runs as 
${UserGroupInformation.getCurrentUser.getShortUserName}  +
-  s' setting user to execute Flink ApplicationMaster/JobManager to 
${yarnClientUsername}')
+val yarnClientUsername = 
System.getenv(FlinkYarnClient.ENV_CLIENT_USERNAME)
+LOG.info(sYARN daemon runs as 
${UserGroupInformation.getCurrentUser.getShortUserName} +
--- End diff --

I leave the scala string interpolation here. The message is only logged 
once, we have the info level on by default and it helps improving the clearness 
to distinguish between the two usernames.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23169293
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -860,8 +910,50 @@ protected Properties getYarnProperties() throws 
IOException {
return yarnProperties;
}

-   protected Client getClient(CommandLine line, ClassLoader classLoader) 
throws IOException {
-   return new Client(getJobManagerAddress(line), 
getGlobalConfiguration(), classLoader);
+   protected Client getClient(CommandLine line, ClassLoader classLoader, 
String programName) throws IOException {
+   String jmAddrString = getJobManagerAddressString(line);
+   InetSocketAddress jobManagerAddress = null;
+   if(jmAddrString.equals(YARN_DEPLOY_JOBMANAGER)) {
+   System.out.println(YARN cluster mode detected. 
Switching Log4j output to console);
+   LogManager.getRootLogger().addAppender(new 
ConsoleAppender(new PatternLayout(DEFAULT_LOG4J_PATTERN_LAYOUT)));
--- End diff --

Do we have to hardwire log4j into our code? That contradicts slf4j's 
purpose. Would be great if we could get rid of that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23169866
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java ---
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Class handling the command line interface to the YARN session.
+ */
+public class FlinkYarnSessionCli {
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnSessionCli.class);
+
+   // Constants   
-
+
+   private static final String CONFIG_FILE_NAME = flink-conf.yaml;
+   public static final String CONFIG_FILE_LOGBACK_NAME = logback.xml;
+   public static final String CONFIG_FILE_LOG4J_NAME = log4j.properties;
+
+
+   private static final int CLIENT_POLLING_INTERVALL = 3;
+
+
+   // Command Line argument options 
-
+   // the prefix transformation is used by the CliFrontend static 
constructor.
+   private final Option QUERY;
+   // --- or ---
+   private final Option QUEUE;
+   private final Option SHIP_PATH;
+   private final Option FLINK_JAR;
+   private final Option JM_MEMORY;
+   private final Option TM_MEMORY;
+   private final Option CONTAINER;
+   private final Option SLOTS;
+
+   /**
+* Dynamic properties allow the user to specify additional 
configuration values with -D, such as
+*  -Dfs.overwrite-files=true  
-Dtaskmanager.network.numberOfBuffers=16368
+*/
+   private final Option DYNAMIC_PROPERTIES;
+
+   private AbstractFlinkYarnCluster yarnCluster = null;
+
+   public FlinkYarnSessionCli(String shortPrefix, String longPrefix) {
+   QUERY = new Option(shortPrefix + q, longPrefix + query, 
false, Display available YARN resources (memory, cores));
+   QUEUE = new Option(shortPrefix + qu, longPrefix + queue, 
true, Specify YARN queue.);
+   SHIP_PATH = new Option(shortPrefix + t, longPrefix + ship, 
true, Ship files in the specified directory (t for transfer));
+   FLINK_JAR = new Option(shortPrefix + j, longPrefix + jar, 
true, Path to Flink jar file);
+   JM_MEMORY = new Option(shortPrefix + jm, longPrefix + 
jobManagerMemory, true, Memory for JobManager Container [in MB]);
+   TM_MEMORY = new Option(shortPrefix + tm, longPrefix + 
taskManagerMemory, true, Memory per TaskManager Container [in MB]);
+   CONTAINER = new Option(shortPrefix + n, longPrefix + 
container, true, Number of YARN container to allocate (=Number of Task 
Managers));
+   SLOTS = new Option(shortPrefix + s, longPrefix + slots, 
true, Number of slots per TaskManager);
+   DYNAMIC_PROPERTIES = new Option(shortPrefix + D, true, 
Dynamic 

[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/292#discussion_r23171500
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java ---
@@ -86,7 +86,7 @@ public static InetAddress 
resolveAddress(InetSocketAddress jobManagerAddress) th
case SLOW_CONNECT:
boolean correct = 
tryToConnect(i, jobManagerAddress, strategy.getTimeout());
if (correct) {
-   
LOG.info(Determined  + i +  as the TaskTracker's own IP address);
+   
LOG.info(Determined  + i +  as the machine's own IP address);
--- End diff --

Same here with the logging string.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-18 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/292#issuecomment-70415876
  
A good piece of work!. I like that there is a much better test coverage for 
the Yarn cluster now.

@rmetzger Can you comment on a few open questions? 

 - There are two projects for yarn: flink-yarn and flink-yarn-tests. 
Can we combine them in one project? The reason why we put the tests in 
flink-scala into a different project was only beause the Scala macros uses in 
flink-scala caused problems otherwise. Since the Yarn project is not using 
macros, I see no reason for the separation.
 
 - The example for the YARN session should probably not refer to a local 
file (file:///...)
 
 - There is a bit of a strange mechanism in computing the ports for the 
application master (base port plus attempt-id or so). Can this be simplified? 
Also, what happens if that port is already in use, per chance?
   
 - How hard would it be to make the client parameter that starts a yarn 
session for the job simpler? Something like `-y` rather than `-m yarn-cluser`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-18 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/292#issuecomment-70416726
  
Thank you for the feedback. I'll address the inline comments.

Regarding the questions:
1. The separation between `flink-yarn` and `flink-yarn-tests` exists 
because the `flink-yarn-tests` expect the `flink-dist` package to be build 
before the `flink-yarn-tests` package.
The tests are really starting a YARN cluster and deploying a flink fat-jar 
to it. Therefore, we need to run `flink-dist` first. I asked on the maven 
mailing list if there is a way of simplifying this and it seems to be possible 
to store the archetype descriptors (used to build the fat-jar) in a separate 
maven module which is then accessed by `flink-dist` and a `prepare-tests` phase 
of `flink-yarn`. But for that approach, we would need to create an additional 
maven module (something like `flink-assemblies` for making the assembly 
descriptor independent of the `flink-dist` package). 

  *tl;dr* maven is not flexible enough for a better solution.
2. Yep. That doesn't make much sense (I used a local yarn cluster for 
testing it .. thats why I didn't really stumble across it). I'll fix it.
3. This is true for the old YARN client (before this pull request). As you 
can see here: 
https://github.com/apache/flink/pull/292/files#diff-37b2363833862d636afea47fab39a694L269
 I removed the code that was computing the port. This new YARN client is 
allocating ALL ports dynamically (web frontend, RPC). I'm using YARN to 
transfer the RPC port of the AM to the client.
4. Probably not hard. Its a matter of taste I guess. I think my approach is 
more flexible (imagine we want to have a `-m mesos-cluster` or a `-m 
flink-local` at some point). Also, we would need to throw an exception if a 
user is setting something like `-j -m myCluster:6123`. It would be good to get 
some more opinions here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1295][FLINK-883] Allow to deploy 'job o...

2015-01-09 Thread rmetzger
GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/292

[FLINK-1295][FLINK-883] Allow to deploy 'job only' YARN cluster. Add tests 
to YARN

- users can now also deploy Flink on YARN for executing a single job.
- the flink-yarn project has been moved out of the flink-addons module
- the MiniYARNCluster is used for testing Flink on YARN
- There is now a (undocumented) Java interface Flink's YARN client, 
allowing users to manually control the YARN session.
- ALL ports used by Flink when running on YARN are automatically 
determined. In the past users reported problems with blocked ports (YARN is 
telling the client the RPC address of the application master)
- The checks before deployment have been improved to give better error 
messages if the user is requesting too many resources for a YARN session

The change has been tested on google compute cloud (click to deploy with 
google cloud storage, not hdfs) and amazon emr (hdfs).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink 
flink1295-flink883-rebased-after-akka

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/292.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #292


commit 273cafbf44b5796b96ff2396b0f6e68f79ce9185
Author: Robert Metzger rmetz...@apache.org
Date:   2014-12-01T17:59:49Z

[FLINK-1295][FLINK-883] Allow to deploy 'job only' YARN cluster. Add tests 
to YARN

- users can now also deploy Flink on YARN for executing a single job.
- The flink-yarn project has been moved out of the flink-addons module
- the MiniYARNCluster is used for testing Flink on YARN
- There is now a (undocumented) Java interface Flink's YARN client, 
allowing users to manually control the Yarn session.
- ALL ports used by Flink when running on YARN are automatically 
determined. In the past users reported problems with blocked ports (YARN is 
telling the client the RPC address of the application master)
- The checks before deployment have been improved to give better error 
messages if the user is requesting too many resources for a YARN session




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---