[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/542#issuecomment-87967427 Thank you very much for the review Henry! I addressed your feedback where applicable. I'll merge the change now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27456302 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java --- @@ -106,70 +111,129 @@ public FlinkYarnCluster(final YarnClient yarnClient, final ApplicationId appId, this.sessionFilesDir = sessionFilesDir; this.applicationId = appId; this.detached = detached; + this.flinkConfig = flinkConfig; + this.appId = appId; // get one application report manually intialAppReport = yarnClient.getApplicationReport(appId); String jobManagerHost = intialAppReport.getHost(); int jobManagerPort = intialAppReport.getRpcPort(); this.jobManagerAddress = new InetSocketAddress(jobManagerHost, jobManagerPort); + } - if(!detached) { - // start actor system - LOG.info(Start actor system.); - InetAddress ownHostname = NetUtils.resolveAddress(jobManagerAddress); // find name of own public interface, able to connect to the JM - actorSystem = AkkaUtils.createActorSystem(flinkConfig, - new Some(new Tuple2String, Integer(ownHostname.getCanonicalHostName(), 0))); + /** +* Connect the FlinkYarnCluster to the ApplicationMaster. +* +* Detached YARN sessions don't need to connect to the ApplicationMaster. +* Detached per job YARN sessions need to connect until the required number of TaskManagers have been started. +* +* @throws IOException +*/ + public void connectToCluster() throws IOException { + if(isConnected) { + throw new IllegalStateException(Can not connect to the cluster again); + } - // start application client - LOG.info(Start application client.); + // start actor system + LOG.info(Start actor system.); + InetAddress ownHostname = NetUtils.resolveAddress(jobManagerAddress); // find name of own public interface, able to connect to the JM + actorSystem = AkkaUtils.createActorSystem(flinkConfig, + new Some(new Tuple2String, Integer(ownHostname.getCanonicalHostName(), 0))); - applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class), applicationClient); + // start application client + LOG.info(Start application client.); - // instruct ApplicationClient to start a periodical status polling - applicationClient.tell(new Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient); + applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class, flinkConfig), applicationClient); + // instruct ApplicationClient to start a periodical status polling + applicationClient.tell(new Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient); - // add hook to ensure proper shutdown - Runtime.getRuntime().addShutdownHook(clientShutdownHook); - actorRunner = new Thread(new Runnable() { - @Override - public void run() { - // blocks until ApplicationMaster has been stopped - actorSystem.awaitTermination(); + actorRunner = new Thread(new Runnable() { + @Override + public void run() { + // blocks until ApplicationMaster has been stopped + actorSystem.awaitTermination(); - // get final application report - try { - ApplicationReport appReport = yarnClient.getApplicationReport(appId); - - LOG.info(Application + appId + finished with state + appReport - .getYarnApplicationState() + and final state + appReport - .getFinalApplicationStatus() + at + appReport.getFinishTime()); - - if (appReport.getYarnApplicationState() == YarnApplicationState.FAILED || appReport.getYarnApplicationState() - == YarnApplicationState.KILLED) { - LOG.warn(Application failed. Diagnostics + appReport.getDiagnostics()); -
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27456202 --- Diff: flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java --- @@ -80,7 +80,7 @@ public String getExecutionPlan() throws Exception { private OptimizedPlan compileProgram(String jobName) { Plan p = createProgramPlan(jobName); - Optimizer pc = new Optimizer(new DataStatistics()); + Optimizer pc = new Optimizer(new DataStatistics(), this.executor.getConfiguration()); --- End diff -- The executor here is of type `ForkableFlinkMiniCluster`. We have too many local test clusters in Flink :( --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27456299 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -265,12 +266,32 @@ protected int run(String[] args) { } try { - Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName()); - - int parallelism = options.getParallelism(); - int exitCode = executeProgram(program, client, parallelism); - - if (yarnCluster != null) { + int userParallelism = options.getParallelism(); + LOG.debug(User parallelism is set to {}, userParallelism); + + Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), userParallelism); + LOG.debug(Client slots is set to {}, client.getMaxSlots()); + if(client.getMaxSlots() != -1 userParallelism == -1) { + logAndSysout(Using the parallelism provided by the remote cluster (+client.getMaxSlots()+). + + To use another parallelism, set it at the ./bin/flink client.); + userParallelism = client.getMaxSlots(); + } + int exitCode = 0; + + // check if detached per job yarn cluster is used to start flink + if(yarnCluster != null yarnCluster.isDetached()) { + logAndSysout(The Flink YARN client has been started in detached mode. In order to stop + + Flink on YARN, use the following command or a YARN web interface to stop it:\n + + yarn application -kill +yarnCluster.getApplicationId()+\n + + Please also note that the temporary files of the YARN session in the home directoy will not be removed.); + executeProgram(program, client, userParallelism, false); + } else { + // regular (blocking) execution. + exitCode = executeProgram(program, client, userParallelism, true); + } + + // show YARN cluster status if its not a detached YARN cluster. + if (yarnCluster != null !yarnCluster.isDetached()) { --- End diff -- I leave the code there to use the nicer error handling of the enclosing try/catch block. In catch, we use the `handleError()` method to show a nice error message. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27456233 --- Diff: flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java --- @@ -89,15 +89,20 @@ public void setPrintStatusDuringExecution(boolean printStatus) { } // - + + public Configuration getConfiguration() { --- End diff -- Thanks. I've changed the method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27456244 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -569,15 +590,33 @@ protected int executeProgram(PackagedProgram program, Client client, int paralle program.deleteExtractedLibraries(); } - LOG.info(Program execution finished); + if(wait) { + LOG.info(Program execution finished); + } - // we come here after the job has finished + // we come here after the job has finished (or the job has been submitted) if (execResult != null) { - System.out.println(Job Runtime: + execResult.getNetRuntime()); - MapString, Object accumulatorsResult = execResult.getAllAccumulatorResults(); - if (accumulatorsResult.size() 0) { - System.out.println(Accumulator Results: ); - System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult)); + // if the job has been submitted to a detached YARN cluster, there won't be any + // exec results, but the object will be set (for the job id) + if(yarnCluster != null yarnCluster.isDetached()) { --- End diff -- Done. I think we should add a checkstyle rule for this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27456228 --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala --- @@ -18,12 +18,12 @@ package org.apache.flink.api.scala.operators.translation +import org.apache.flink.optimizer.util.CompilerTestBase --- End diff -- Yes, IntelliJ sometimes relocates imports when refactoring. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/542 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/542#issuecomment-88136878 Awesome! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/542#issuecomment-87817661 @rmetzger, the PR is too large to do effective review =( Could you kindly summarize the significant changes made to fix this? For example why introduce new class JobSubmissionResult. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/542#issuecomment-87780049 A user I'm talking with offline depends on these changes. I'm currently hardening the tests on Travis, once that's done I'll merge the changes (probably in the next 12-15 hours). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/542#issuecomment-87840336 I know that the PR is touching many components. I'll try to split up my work into smaller parts. In this case I didn't expect in the beginning that I need to change so many things. The PR is mainly about adding support for executing a Flink job on YARN in a fire and forget fashion. Therefore, I needed to make some changes to the YARN client. In the previous big change to YARN, I added support for a detached yarn session. So that you can tell the Flink Yarn Client to start Flink on YARN without connecting to the AM afterwards. Users have to manage such a yarn session using other tools afterwards (for example `yarn application -kill` to stop it) This change brings this feature even further to support single flink jobs being submitted to YARN. But since the Yarn client doesn't connect to the AM once Flink has been started, there is no way to tell the AM to stop Flink on YARN again. In this change, I add a new Akka message for the ApplicationMaster `case class StopAMAfterJob(jobId:JobID)`. The message tells the AM to monitor the JM until the job has finished. Once that has happened, the AM stops Flink on YARN. To get this `JobID` I needed to make some changes to the CliFrontend / Client. The Client has two ways of submitting a Job to Flink: an attached mode (default) and a detached mode. The attached mode is returning the `JobExecutionResult`, the detached mode was returning nothing. I created a new type called `JobSubmissionResult` which is returned by the detached job submission. It only contains the job id. The JobExecutionResult is extending the JobSubmissionResult. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27452529 --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala --- @@ -18,12 +18,12 @@ package org.apache.flink.api.scala.operators.translation +import org.apache.flink.optimizer.util.CompilerTestBase --- End diff -- side effect of auto format? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27452585 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java --- @@ -106,70 +111,129 @@ public FlinkYarnCluster(final YarnClient yarnClient, final ApplicationId appId, this.sessionFilesDir = sessionFilesDir; this.applicationId = appId; this.detached = detached; + this.flinkConfig = flinkConfig; + this.appId = appId; // get one application report manually intialAppReport = yarnClient.getApplicationReport(appId); String jobManagerHost = intialAppReport.getHost(); int jobManagerPort = intialAppReport.getRpcPort(); this.jobManagerAddress = new InetSocketAddress(jobManagerHost, jobManagerPort); + } - if(!detached) { - // start actor system - LOG.info(Start actor system.); - InetAddress ownHostname = NetUtils.resolveAddress(jobManagerAddress); // find name of own public interface, able to connect to the JM - actorSystem = AkkaUtils.createActorSystem(flinkConfig, - new Some(new Tuple2String, Integer(ownHostname.getCanonicalHostName(), 0))); + /** +* Connect the FlinkYarnCluster to the ApplicationMaster. +* +* Detached YARN sessions don't need to connect to the ApplicationMaster. +* Detached per job YARN sessions need to connect until the required number of TaskManagers have been started. +* +* @throws IOException +*/ + public void connectToCluster() throws IOException { + if(isConnected) { + throw new IllegalStateException(Can not connect to the cluster again); + } - // start application client - LOG.info(Start application client.); + // start actor system + LOG.info(Start actor system.); + InetAddress ownHostname = NetUtils.resolveAddress(jobManagerAddress); // find name of own public interface, able to connect to the JM + actorSystem = AkkaUtils.createActorSystem(flinkConfig, + new Some(new Tuple2String, Integer(ownHostname.getCanonicalHostName(), 0))); - applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class), applicationClient); + // start application client + LOG.info(Start application client.); - // instruct ApplicationClient to start a periodical status polling - applicationClient.tell(new Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient); + applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class, flinkConfig), applicationClient); + // instruct ApplicationClient to start a periodical status polling + applicationClient.tell(new Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient); - // add hook to ensure proper shutdown - Runtime.getRuntime().addShutdownHook(clientShutdownHook); - actorRunner = new Thread(new Runnable() { - @Override - public void run() { - // blocks until ApplicationMaster has been stopped - actorSystem.awaitTermination(); + actorRunner = new Thread(new Runnable() { + @Override + public void run() { + // blocks until ApplicationMaster has been stopped + actorSystem.awaitTermination(); - // get final application report - try { - ApplicationReport appReport = yarnClient.getApplicationReport(appId); - - LOG.info(Application + appId + finished with state + appReport - .getYarnApplicationState() + and final state + appReport - .getFinalApplicationStatus() + at + appReport.getFinishTime()); - - if (appReport.getYarnApplicationState() == YarnApplicationState.FAILED || appReport.getYarnApplicationState() - == YarnApplicationState.KILLED) { - LOG.warn(Application failed. Diagnostics + appReport.getDiagnostics()); -
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27451902 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -265,12 +266,32 @@ protected int run(String[] args) { } try { - Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName()); - - int parallelism = options.getParallelism(); - int exitCode = executeProgram(program, client, parallelism); - - if (yarnCluster != null) { + int userParallelism = options.getParallelism(); + LOG.debug(User parallelism is set to {}, userParallelism); + + Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), userParallelism); + LOG.debug(Client slots is set to {}, client.getMaxSlots()); + if(client.getMaxSlots() != -1 userParallelism == -1) { + logAndSysout(Using the parallelism provided by the remote cluster (+client.getMaxSlots()+). + + To use another parallelism, set it at the ./bin/flink client.); + userParallelism = client.getMaxSlots(); + } + int exitCode = 0; + + // check if detached per job yarn cluster is used to start flink + if(yarnCluster != null yarnCluster.isDetached()) { + logAndSysout(The Flink YARN client has been started in detached mode. In order to stop + + Flink on YARN, use the following command or a YARN web interface to stop it:\n + + yarn application -kill +yarnCluster.getApplicationId()+\n + + Please also note that the temporary files of the YARN session in the home directoy will not be removed.); + executeProgram(program, client, userParallelism, false); + } else { + // regular (blocking) execution. + exitCode = executeProgram(program, client, userParallelism, true); + } + + // show YARN cluster status if its not a detached YARN cluster. + if (yarnCluster != null !yarnCluster.isDetached()) { --- End diff -- Since finally trumps returns statement, the code block following this check could be moved to finally statement below? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27452411 --- Diff: flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java --- @@ -89,15 +89,20 @@ public void setPrintStatusDuringExecution(boolean printStatus) { } // - + + public Configuration getConfiguration() { --- End diff -- I think we could use static method to indicate this is only for LocalExecutor: ```java public static Configuration createConfigForLocalExecutor(LocalExecutor le) { } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27452062 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -569,15 +590,33 @@ protected int executeProgram(PackagedProgram program, Client client, int paralle program.deleteExtractedLibraries(); } - LOG.info(Program execution finished); + if(wait) { + LOG.info(Program execution finished); + } - // we come here after the job has finished + // we come here after the job has finished (or the job has been submitted) if (execResult != null) { - System.out.println(Job Runtime: + execResult.getNetRuntime()); - MapString, Object accumulatorsResult = execResult.getAllAccumulatorResults(); - if (accumulatorsResult.size() 0) { - System.out.println(Accumulator Results: ); - System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult)); + // if the job has been submitted to a detached YARN cluster, there won't be any + // exec results, but the object will be set (for the job id) + if(yarnCluster != null yarnCluster.isDetached()) { --- End diff -- Small style nit, space after if-else and parentheses. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27452509 --- Diff: flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java --- @@ -80,7 +80,7 @@ public String getExecutionPlan() throws Exception { private OptimizedPlan compileProgram(String jobName) { Plan p = createProgramPlan(jobName); - Optimizer pc = new Optimizer(new DataStatistics()); + Optimizer pc = new Optimizer(new DataStatistics(), this.executor.getConfiguration()); --- End diff -- For this and other tests, if we made static method to create new Configuration, we could make it clear this is just for local executor: ``` LocalExecutor.createConfigForLocalExecutor(this.executor); ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/542#issuecomment-87936262 While it is a big patch, with the additional comments I was able to follow your changes. The main changes look good, so the rest I assume is side effect to make the refactor working. Added some small comments on the PR and other than those seems like ready to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1771] Add support for submitting single...
GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/542 [FLINK-1771] Add support for submitting single jobs to a detached YARN session With this change, users can submit a Flink job to a YARN cluster without having a local client monitoring the Application Master or job status. You can basically fire and forget a Flink job to YARN. For supporting this, the ApplicationMaster can now monitor the status of a job and shutdown itself once it is in a terminal state. The change also verifies that various ways of setting the parallelism on YARN are passed through the system correctly (per job, session). There was a bug in YARN container creation which made the configuration values for the heap offset useless. This change fixes this error. All mentioned features and bugs are covered by the flink-yarn-tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink yarn-slots-test-rebased Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/542.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #542 commit afdc9ac52287f35bdc1413c64f8abbc06efbb3ec Author: Robert Metzger rmetz...@apache.org Date: 2015-03-13T14:53:51Z [FLINK-1771] Add support for submitting single jobs to a detached YARN session With this change, users can submit a Flink job to a YARN cluster without having a local client monitoring the Application Master or job. You can basically fire and forget a Flink job to YARN. For supporting this, the ApplicationMaster can now monitor the status of a job and shutdown itself once it is in a terminal state. The change also verifies that various ways of setting the parallelism on YARN are passed through the system correctly (per job, session). There was a bug in YARN container creation which made the configuration values for the heap offset useless. This change fixes this error. All mentioned features and bugs are covered by the flink-yarn-tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---