[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...

2015-03-31 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/542#issuecomment-87967427
  
Thank you very much for the review Henry!

I addressed your feedback where applicable. I'll merge the change now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...

2015-03-31 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/542#discussion_r27456302
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java ---
@@ -106,70 +111,129 @@ public FlinkYarnCluster(final YarnClient yarnClient, 
final ApplicationId appId,
this.sessionFilesDir = sessionFilesDir;
this.applicationId = appId;
this.detached = detached;
+   this.flinkConfig = flinkConfig;
+   this.appId = appId;
 
// get one application report manually
intialAppReport = yarnClient.getApplicationReport(appId);
String jobManagerHost = intialAppReport.getHost();
int jobManagerPort = intialAppReport.getRpcPort();
this.jobManagerAddress = new InetSocketAddress(jobManagerHost, 
jobManagerPort);
+   }
 
-   if(!detached) {
-   // start actor system
-   LOG.info(Start actor system.);
-   InetAddress ownHostname = 
NetUtils.resolveAddress(jobManagerAddress); // find name of own public 
interface, able to connect to the JM
-   actorSystem = AkkaUtils.createActorSystem(flinkConfig,
-   new Some(new Tuple2String, 
Integer(ownHostname.getCanonicalHostName(), 0)));
+   /**
+* Connect the FlinkYarnCluster to the ApplicationMaster.
+*
+* Detached YARN sessions don't need to connect to the 
ApplicationMaster.
+* Detached per job YARN sessions need to connect until the required 
number of TaskManagers have been started.
+* 
+* @throws IOException
+*/
+   public void connectToCluster() throws IOException {
+   if(isConnected) {
+   throw new IllegalStateException(Can not connect to the 
cluster again);
+   }
 
-   // start application client
-   LOG.info(Start application client.);
+   // start actor system
+   LOG.info(Start actor system.);
+   InetAddress ownHostname = 
NetUtils.resolveAddress(jobManagerAddress); // find name of own public 
interface, able to connect to the JM
+   actorSystem = AkkaUtils.createActorSystem(flinkConfig,
+   new Some(new Tuple2String, 
Integer(ownHostname.getCanonicalHostName(), 0)));
 
-   applicationClient = 
actorSystem.actorOf(Props.create(ApplicationClient.class), applicationClient);
+   // start application client
+   LOG.info(Start application client.);
 
-   // instruct ApplicationClient to start a periodical 
status polling
-   applicationClient.tell(new 
Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient);
+   applicationClient = 
actorSystem.actorOf(Props.create(ApplicationClient.class, flinkConfig), 
applicationClient);
 
+   // instruct ApplicationClient to start a periodical status 
polling
+   applicationClient.tell(new 
Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient);
 
-   // add hook to ensure proper shutdown
-   
Runtime.getRuntime().addShutdownHook(clientShutdownHook);
 
-   actorRunner = new Thread(new Runnable() {
-   @Override
-   public void run() {
-   // blocks until ApplicationMaster has been 
stopped
-   actorSystem.awaitTermination();
+   actorRunner = new Thread(new Runnable() {
+   @Override
+   public void run() {
+   // blocks until ApplicationMaster has been stopped
+   actorSystem.awaitTermination();
 
-   // get final application report
-   try {
-   ApplicationReport appReport = 
yarnClient.getApplicationReport(appId);
-
-   LOG.info(Application  + appId +  
finished with state  + appReport
-   
.getYarnApplicationState() +  and final state  + appReport
-   
.getFinalApplicationStatus() +  at  + appReport.getFinishTime());
-
-   if (appReport.getYarnApplicationState() 
== YarnApplicationState.FAILED || appReport.getYarnApplicationState()
-   == 
YarnApplicationState.KILLED) {
-   LOG.warn(Application failed. 
Diagnostics  + appReport.getDiagnostics());
-  

[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...

2015-03-31 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/542#discussion_r27456202
  
--- Diff: 
flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java 
---
@@ -80,7 +80,7 @@ public String getExecutionPlan() throws Exception {
private OptimizedPlan compileProgram(String jobName) {
Plan p = createProgramPlan(jobName);
 
-   Optimizer pc = new Optimizer(new DataStatistics());
+   Optimizer pc = new Optimizer(new DataStatistics(), 
this.executor.getConfiguration());
--- End diff --

The executor here is of type `ForkableFlinkMiniCluster`. We have too many 
local test clusters in Flink :(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...

2015-03-31 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/542#discussion_r27456299
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -265,12 +266,32 @@ protected int run(String[] args) {
}
 
try {
-   Client client = getClient(options, 
program.getUserCodeClassLoader(), program.getMainClassName());
-
-   int parallelism = options.getParallelism();
-   int exitCode = executeProgram(program, client, 
parallelism);
-
-   if (yarnCluster != null) {
+   int userParallelism = options.getParallelism();
+   LOG.debug(User parallelism is set to {}, 
userParallelism);
+
+   Client client = getClient(options, 
program.getUserCodeClassLoader(), program.getMainClassName(), userParallelism);
+   LOG.debug(Client slots is set to {}, 
client.getMaxSlots());
+   if(client.getMaxSlots() != -1  userParallelism == -1) 
{
+   logAndSysout(Using the parallelism provided by 
the remote cluster (+client.getMaxSlots()+).  +
+   To use another parallelism, 
set it at the ./bin/flink client.);
+   userParallelism = client.getMaxSlots();
+   }
+   int exitCode = 0;
+
+   // check if detached per job yarn cluster is used to 
start flink
+   if(yarnCluster != null  yarnCluster.isDetached()) {
+   logAndSysout(The Flink YARN client has been 
started in detached mode. In order to stop  +
+   Flink on YARN, use the 
following command or a YARN web interface to stop it:\n +
+   yarn application -kill 
+yarnCluster.getApplicationId()+\n +
+   Please also note that the 
temporary files of the YARN session in the home directoy will not be removed.);
+   executeProgram(program, client, 
userParallelism, false);
+   } else {
+   // regular (blocking) execution.
+   exitCode = executeProgram(program, client, 
userParallelism, true);
+   }
+
+   // show YARN cluster status if its not a detached YARN 
cluster.
+   if (yarnCluster != null  !yarnCluster.isDetached()) {
--- End diff --

I leave the code there to use the nicer error handling of the enclosing 
try/catch block. In catch, we use the `handleError()` method to show a nice 
error message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...

2015-03-31 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/542#discussion_r27456233
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java ---
@@ -89,15 +89,20 @@ public void setPrintStatusDuringExecution(boolean 
printStatus) {
}

// 

-   
+
+   public Configuration getConfiguration() {
--- End diff --

Thanks. I've changed the method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...

2015-03-31 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/542#discussion_r27456244
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -569,15 +590,33 @@ protected int executeProgram(PackagedProgram program, 
Client client, int paralle
program.deleteExtractedLibraries();
}
 
-   LOG.info(Program execution finished);
+   if(wait) {
+   LOG.info(Program execution finished);
+   }
 
-   // we come here after the job has finished
+   // we come here after the job has finished (or the job has been 
submitted)
if (execResult != null) {
-   System.out.println(Job Runtime:  + 
execResult.getNetRuntime());
-   MapString, Object accumulatorsResult = 
execResult.getAllAccumulatorResults();
-   if (accumulatorsResult.size()  0) {
-   System.out.println(Accumulator Results: );
-   
System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
+   // if the job has been submitted to a detached YARN 
cluster, there won't be any
+   // exec results, but the object will be set (for the 
job id)
+   if(yarnCluster != null  yarnCluster.isDetached()) {
--- End diff --

Done.
I think we should add a checkstyle rule for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...

2015-03-31 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/542#discussion_r27456228
  
--- Diff: 
flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
 ---
@@ -18,12 +18,12 @@
 
 package org.apache.flink.api.scala.operators.translation
 
+import org.apache.flink.optimizer.util.CompilerTestBase
--- End diff --

Yes, IntelliJ sometimes relocates imports when refactoring.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...

2015-03-31 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/542


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...

2015-03-31 Thread hsaputra
Github user hsaputra commented on the pull request:

https://github.com/apache/flink/pull/542#issuecomment-88136878
  
Awesome!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...

2015-03-30 Thread hsaputra
Github user hsaputra commented on the pull request:

https://github.com/apache/flink/pull/542#issuecomment-87817661
  
@rmetzger, the PR is too large to do effective review =(

Could you kindly summarize the significant changes made to fix this? For 
example why introduce new class JobSubmissionResult.

Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...

2015-03-30 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/542#issuecomment-87780049
  
A user I'm talking with offline depends on these changes.
I'm currently hardening the tests on Travis, once that's done I'll merge 
the changes (probably in the next 12-15 hours).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...

2015-03-30 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/542#issuecomment-87840336
  
I know that the PR is touching many components. I'll try to split up my 
work into smaller parts.
In this case I didn't expect in the beginning that I need to change so many 
things.

The PR is mainly about adding support for executing a Flink job on YARN in 
a fire and forget fashion.
Therefore, I needed to make some changes to the YARN client. 
In the previous big change to YARN, I added support for a detached yarn 
session. So that you can tell the Flink Yarn Client to start Flink on YARN 
without connecting to the AM afterwards. Users have to manage such a yarn 
session using other tools afterwards (for example `yarn application -kill` to 
stop it)

This change brings this feature even further to support single flink jobs 
being submitted to YARN. But since the Yarn client doesn't connect to the AM 
once Flink has been started, there is no way to tell the AM to stop Flink on 
YARN again.
In this change, I add a new Akka message for the ApplicationMaster `case 
class StopAMAfterJob(jobId:JobID)`. The message tells the AM to monitor the JM 
until the job has finished. Once that has happened, the AM stops Flink on YARN.
To get this `JobID` I needed to make some changes to the CliFrontend / 
Client. The Client has two ways of submitting a Job to Flink: an attached mode 
(default) and a detached mode.
The attached mode is returning the `JobExecutionResult`, the detached mode 
was returning nothing. I created a new type called `JobSubmissionResult` which 
is returned by the detached job submission. It only contains the job id. The 
JobExecutionResult is extending the JobSubmissionResult.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...

2015-03-30 Thread hsaputra
Github user hsaputra commented on a diff in the pull request:

https://github.com/apache/flink/pull/542#discussion_r27452529
  
--- Diff: 
flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
 ---
@@ -18,12 +18,12 @@
 
 package org.apache.flink.api.scala.operators.translation
 
+import org.apache.flink.optimizer.util.CompilerTestBase
--- End diff --

side effect of auto format?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...

2015-03-30 Thread hsaputra
Github user hsaputra commented on a diff in the pull request:

https://github.com/apache/flink/pull/542#discussion_r27452585
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java ---
@@ -106,70 +111,129 @@ public FlinkYarnCluster(final YarnClient yarnClient, 
final ApplicationId appId,
this.sessionFilesDir = sessionFilesDir;
this.applicationId = appId;
this.detached = detached;
+   this.flinkConfig = flinkConfig;
+   this.appId = appId;
 
// get one application report manually
intialAppReport = yarnClient.getApplicationReport(appId);
String jobManagerHost = intialAppReport.getHost();
int jobManagerPort = intialAppReport.getRpcPort();
this.jobManagerAddress = new InetSocketAddress(jobManagerHost, 
jobManagerPort);
+   }
 
-   if(!detached) {
-   // start actor system
-   LOG.info(Start actor system.);
-   InetAddress ownHostname = 
NetUtils.resolveAddress(jobManagerAddress); // find name of own public 
interface, able to connect to the JM
-   actorSystem = AkkaUtils.createActorSystem(flinkConfig,
-   new Some(new Tuple2String, 
Integer(ownHostname.getCanonicalHostName(), 0)));
+   /**
+* Connect the FlinkYarnCluster to the ApplicationMaster.
+*
+* Detached YARN sessions don't need to connect to the 
ApplicationMaster.
+* Detached per job YARN sessions need to connect until the required 
number of TaskManagers have been started.
+* 
+* @throws IOException
+*/
+   public void connectToCluster() throws IOException {
+   if(isConnected) {
+   throw new IllegalStateException(Can not connect to the 
cluster again);
+   }
 
-   // start application client
-   LOG.info(Start application client.);
+   // start actor system
+   LOG.info(Start actor system.);
+   InetAddress ownHostname = 
NetUtils.resolveAddress(jobManagerAddress); // find name of own public 
interface, able to connect to the JM
+   actorSystem = AkkaUtils.createActorSystem(flinkConfig,
+   new Some(new Tuple2String, 
Integer(ownHostname.getCanonicalHostName(), 0)));
 
-   applicationClient = 
actorSystem.actorOf(Props.create(ApplicationClient.class), applicationClient);
+   // start application client
+   LOG.info(Start application client.);
 
-   // instruct ApplicationClient to start a periodical 
status polling
-   applicationClient.tell(new 
Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient);
+   applicationClient = 
actorSystem.actorOf(Props.create(ApplicationClient.class, flinkConfig), 
applicationClient);
 
+   // instruct ApplicationClient to start a periodical status 
polling
+   applicationClient.tell(new 
Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient);
 
-   // add hook to ensure proper shutdown
-   
Runtime.getRuntime().addShutdownHook(clientShutdownHook);
 
-   actorRunner = new Thread(new Runnable() {
-   @Override
-   public void run() {
-   // blocks until ApplicationMaster has been 
stopped
-   actorSystem.awaitTermination();
+   actorRunner = new Thread(new Runnable() {
+   @Override
+   public void run() {
+   // blocks until ApplicationMaster has been stopped
+   actorSystem.awaitTermination();
 
-   // get final application report
-   try {
-   ApplicationReport appReport = 
yarnClient.getApplicationReport(appId);
-
-   LOG.info(Application  + appId +  
finished with state  + appReport
-   
.getYarnApplicationState() +  and final state  + appReport
-   
.getFinalApplicationStatus() +  at  + appReport.getFinishTime());
-
-   if (appReport.getYarnApplicationState() 
== YarnApplicationState.FAILED || appReport.getYarnApplicationState()
-   == 
YarnApplicationState.KILLED) {
-   LOG.warn(Application failed. 
Diagnostics  + appReport.getDiagnostics());
-  

[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...

2015-03-30 Thread hsaputra
Github user hsaputra commented on a diff in the pull request:

https://github.com/apache/flink/pull/542#discussion_r27451902
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -265,12 +266,32 @@ protected int run(String[] args) {
}
 
try {
-   Client client = getClient(options, 
program.getUserCodeClassLoader(), program.getMainClassName());
-
-   int parallelism = options.getParallelism();
-   int exitCode = executeProgram(program, client, 
parallelism);
-
-   if (yarnCluster != null) {
+   int userParallelism = options.getParallelism();
+   LOG.debug(User parallelism is set to {}, 
userParallelism);
+
+   Client client = getClient(options, 
program.getUserCodeClassLoader(), program.getMainClassName(), userParallelism);
+   LOG.debug(Client slots is set to {}, 
client.getMaxSlots());
+   if(client.getMaxSlots() != -1  userParallelism == -1) 
{
+   logAndSysout(Using the parallelism provided by 
the remote cluster (+client.getMaxSlots()+).  +
+   To use another parallelism, 
set it at the ./bin/flink client.);
+   userParallelism = client.getMaxSlots();
+   }
+   int exitCode = 0;
+
+   // check if detached per job yarn cluster is used to 
start flink
+   if(yarnCluster != null  yarnCluster.isDetached()) {
+   logAndSysout(The Flink YARN client has been 
started in detached mode. In order to stop  +
+   Flink on YARN, use the 
following command or a YARN web interface to stop it:\n +
+   yarn application -kill 
+yarnCluster.getApplicationId()+\n +
+   Please also note that the 
temporary files of the YARN session in the home directoy will not be removed.);
+   executeProgram(program, client, 
userParallelism, false);
+   } else {
+   // regular (blocking) execution.
+   exitCode = executeProgram(program, client, 
userParallelism, true);
+   }
+
+   // show YARN cluster status if its not a detached YARN 
cluster.
+   if (yarnCluster != null  !yarnCluster.isDetached()) {
--- End diff --

Since finally trumps returns statement, the code block following this check 
could be moved to finally statement below?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...

2015-03-30 Thread hsaputra
Github user hsaputra commented on a diff in the pull request:

https://github.com/apache/flink/pull/542#discussion_r27452411
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java ---
@@ -89,15 +89,20 @@ public void setPrintStatusDuringExecution(boolean 
printStatus) {
}

// 

-   
+
+   public Configuration getConfiguration() {
--- End diff --

I think we could use static method to indicate this is only for 
LocalExecutor:

```java
public static Configuration createConfigForLocalExecutor(LocalExecutor le) {
  
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...

2015-03-30 Thread hsaputra
Github user hsaputra commented on a diff in the pull request:

https://github.com/apache/flink/pull/542#discussion_r27452062
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -569,15 +590,33 @@ protected int executeProgram(PackagedProgram program, 
Client client, int paralle
program.deleteExtractedLibraries();
}
 
-   LOG.info(Program execution finished);
+   if(wait) {
+   LOG.info(Program execution finished);
+   }
 
-   // we come here after the job has finished
+   // we come here after the job has finished (or the job has been 
submitted)
if (execResult != null) {
-   System.out.println(Job Runtime:  + 
execResult.getNetRuntime());
-   MapString, Object accumulatorsResult = 
execResult.getAllAccumulatorResults();
-   if (accumulatorsResult.size()  0) {
-   System.out.println(Accumulator Results: );
-   
System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
+   // if the job has been submitted to a detached YARN 
cluster, there won't be any
+   // exec results, but the object will be set (for the 
job id)
+   if(yarnCluster != null  yarnCluster.isDetached()) {
--- End diff --

Small style nit, space after if-else and parentheses.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...

2015-03-30 Thread hsaputra
Github user hsaputra commented on a diff in the pull request:

https://github.com/apache/flink/pull/542#discussion_r27452509
  
--- Diff: 
flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java 
---
@@ -80,7 +80,7 @@ public String getExecutionPlan() throws Exception {
private OptimizedPlan compileProgram(String jobName) {
Plan p = createProgramPlan(jobName);
 
-   Optimizer pc = new Optimizer(new DataStatistics());
+   Optimizer pc = new Optimizer(new DataStatistics(), 
this.executor.getConfiguration());
--- End diff --

For this and other tests, if we made static method to create new 
Configuration, we could make it clear this is just for local executor:

```
LocalExecutor.createConfigForLocalExecutor(this.executor);
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...

2015-03-30 Thread hsaputra
Github user hsaputra commented on the pull request:

https://github.com/apache/flink/pull/542#issuecomment-87936262
  
While it is a big patch, with the additional comments I was able to follow 
your changes. The main changes look good, so the rest I assume is side effect 
to make the refactor working.

Added some small comments on the PR and other than those seems like ready 
to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...

2015-03-28 Thread rmetzger
GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/542

[FLINK-1771] Add support for submitting single jobs to a detached YARN 
session

With this change, users can submit a Flink job to a YARN cluster without 
having a local client monitoring the Application Master or job status. You can 
basically fire and forget a Flink job to YARN.
For supporting this, the ApplicationMaster can now monitor the status of a 
job and shutdown itself once it is in a terminal state.

The change also verifies that various ways of setting the parallelism on 
YARN are passed through the system correctly (per job, session).

There was a bug in YARN container creation which made the configuration 
values for the heap offset useless. This change fixes this error.

All mentioned features and bugs are covered by the flink-yarn-tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink yarn-slots-test-rebased

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/542.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #542


commit afdc9ac52287f35bdc1413c64f8abbc06efbb3ec
Author: Robert Metzger rmetz...@apache.org
Date:   2015-03-13T14:53:51Z

[FLINK-1771] Add support for submitting single jobs to a detached YARN 
session

With this change, users can submit a Flink job to a YARN cluster without 
having a local client monitoring the Application Master or job. You can 
basically fire and forget a Flink job to YARN.
For supporting this, the ApplicationMaster can now monitor the status of a 
job and shutdown itself once it is in a terminal state.

The change also verifies that various ways of setting the parallelism on 
YARN are passed through the system correctly (per job, session).

There was a bug in YARN container creation which made the configuration 
values for the heap offset useless. This change fixes this error.

All mentioned features and bugs are covered by the flink-yarn-tests.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---