[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2083 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2083#discussion_r68590084 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) { } /** -* Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint} -* message to the job manager. +* Asks the JobManager to dispose a savepoint. +* +* There are two options for this: +* +* Either the job the savepoint belongs to is still running, in which +* case the user code class loader of the job is used. +* Or the job terminated, in which case the user JARs have to be +* uploaded before disposing the savepoint. +* */ - private int disposeSavepoint(SavepointOptions options, String savepointPath) { - try { - ActorGateway jobManager = getJobManagerGateway(options); - logAndSysout("Disposing savepoint '" + savepointPath + "'."); - Future response = jobManager.ask(new DisposeSavepoint(savepointPath), clientTimeout); + private int disposeSavepoint(SavepointOptions options) throws Throwable { + String savepointPath = Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path"); - Object result; - try { - logAndSysout("Waiting for response..."); - result = Await.result(response, clientTimeout); + JobID jobId = options.getJobId(); + String jarFile = options.getJarFilePath(); + + if (jobId != null && jarFile != null) { + throw new IllegalArgumentException("Cannot dispose savepoint without Job ID or JAR."); + } + + ActorGateway jobManager = getJobManagerGateway(options); + + final Future response; + if (jobId != null) { + // Dispose with class loader of running job + logAndSysout("Disposing savepoint at '" + savepointPath + "' of job " + jobId + " ."); + + response = jobManager.ask( + new JobManagerMessages.DisposeSavepoint(savepointPath, jobId), + clientTimeout); + } else if (jarFile != null) { + logAndSysout("Disposing savepoint at '" + savepointPath + "'."); + + // Dispose with uploaded user code loader + Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); + PackagedProgram program = new PackagedProgram( + new File(jarFile), + options.getClasspaths(), + options.getEntryPointClass(), + options.getProgramArgs()); + FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, program, 1); + + JobGraph jobGraph; + if (flinkPlan instanceof StreamingPlan) { + jobGraph = ((StreamingPlan) flinkPlan).getJobGraph(); + } else { + JobGraphGenerator gen = new JobGraphGenerator(this.config); + jobGraph = gen.compileJobGraph((OptimizedPlan) flinkPlan); } - catch (Exception e) { - throw new Exception("Disposing the savepoint with path" + savepointPath + " failed.", e); + + for (URL jar : program.getAllLibraries()) { + try { + jobGraph.addJar(new Path(jar.toURI())); + } catch (URISyntaxException e) { + throw new RuntimeException("URL is invalid. This should not happen.", e); + } } + jobGraph.setClasspaths(program.getClasspaths()); + + logAndSysout("Uploading JAR files for savepoint disposal."); + JobClient.uploadJarFiles(jobGraph, jobManager, clientTimeout); + + response = jobManager.ask( + new JobManagerMessages.DisposeSavepointWithClassLoader( + savepointPath, + jobGraph.getUserJarBlobKeys(), +
[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2083#discussion_r68588080 --- Diff: docs/apis/cli.md --- @@ -279,13 +289,27 @@ guarantees for a stop request. Action "savepoint" triggers savepoints for a running job or disposes existing ones. - Syntax: savepoint [OPTIONS] - "savepoint" action options: - -d,--disposeDisposes an existing savepoint. - -m,--jobmanager Address of the JobManager (master) to which --- End diff -- I think it should be still there but let me check again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2083#discussion_r68588625 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) { } /** -* Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint} -* message to the job manager. +* Asks the JobManager to dispose a savepoint. +* +* There are two options for this: +* +* Either the job the savepoint belongs to is still running, in which +* case the user code class loader of the job is used. +* Or the job terminated, in which case the user JARs have to be +* uploaded before disposing the savepoint. +* */ - private int disposeSavepoint(SavepointOptions options, String savepointPath) { - try { - ActorGateway jobManager = getJobManagerGateway(options); - logAndSysout("Disposing savepoint '" + savepointPath + "'."); - Future response = jobManager.ask(new DisposeSavepoint(savepointPath), clientTimeout); + private int disposeSavepoint(SavepointOptions options) throws Throwable { + String savepointPath = Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path"); - Object result; - try { - logAndSysout("Waiting for response..."); - result = Await.result(response, clientTimeout); + JobID jobId = options.getJobId(); + String jarFile = options.getJarFilePath(); + + if (jobId != null && jarFile != null) { + throw new IllegalArgumentException("Cannot dispose savepoint without Job ID or JAR."); + } + + ActorGateway jobManager = getJobManagerGateway(options); + + final Future response; + if (jobId != null) { + // Dispose with class loader of running job + logAndSysout("Disposing savepoint at '" + savepointPath + "' of job " + jobId + " ."); + + response = jobManager.ask( + new JobManagerMessages.DisposeSavepoint(savepointPath, jobId), + clientTimeout); + } else if (jarFile != null) { + logAndSysout("Disposing savepoint at '" + savepointPath + "'."); + + // Dispose with uploaded user code loader + Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); + PackagedProgram program = new PackagedProgram( --- End diff -- Oh, good catch! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2083#discussion_r68587921 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java --- @@ -305,6 +307,11 @@ public static void printHelpForSavepoint() { formatter.setSyntaxPrefix(" \"savepoint\" action options:"); formatter.printHelp(" ", getSavepointOptions(new Options())); System.out.println(); + System.out.println("\n Examples:"); + System.out.println(" - Trigger savepoint: bin/flink savepoint "); + System.out.println(" - Dispose savepoint:"); + System.out.println("* For a running job: bin/flink savepoint -d "); + System.out.println("* For a terminated job: bin/flink savepoint -d -j [-c -C ]"); --- End diff -- True, that's inconsistent. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2083#discussion_r68581644 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) { } /** -* Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint} -* message to the job manager. +* Asks the JobManager to dispose a savepoint. +* +* There are two options for this: +* +* Either the job the savepoint belongs to is still running, in which +* case the user code class loader of the job is used. +* Or the job terminated, in which case the user JARs have to be +* uploaded before disposing the savepoint. +* */ - private int disposeSavepoint(SavepointOptions options, String savepointPath) { - try { - ActorGateway jobManager = getJobManagerGateway(options); - logAndSysout("Disposing savepoint '" + savepointPath + "'."); - Future response = jobManager.ask(new DisposeSavepoint(savepointPath), clientTimeout); + private int disposeSavepoint(SavepointOptions options) throws Throwable { + String savepointPath = Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path"); - Object result; - try { - logAndSysout("Waiting for response..."); - result = Await.result(response, clientTimeout); + JobID jobId = options.getJobId(); + String jarFile = options.getJarFilePath(); + + if (jobId != null && jarFile != null) { + throw new IllegalArgumentException("Cannot dispose savepoint without Job ID or JAR."); + } + + ActorGateway jobManager = getJobManagerGateway(options); + + final Future response; + if (jobId != null) { + // Dispose with class loader of running job + logAndSysout("Disposing savepoint at '" + savepointPath + "' of job " + jobId + " ."); + + response = jobManager.ask( + new JobManagerMessages.DisposeSavepoint(savepointPath, jobId), + clientTimeout); + } else if (jarFile != null) { + logAndSysout("Disposing savepoint at '" + savepointPath + "'."); + + // Dispose with uploaded user code loader + Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); + PackagedProgram program = new PackagedProgram( + new File(jarFile), + options.getClasspaths(), + options.getEntryPointClass(), + options.getProgramArgs()); + FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, program, 1); + + JobGraph jobGraph; + if (flinkPlan instanceof StreamingPlan) { + jobGraph = ((StreamingPlan) flinkPlan).getJobGraph(); + } else { + JobGraphGenerator gen = new JobGraphGenerator(this.config); + jobGraph = gen.compileJobGraph((OptimizedPlan) flinkPlan); } - catch (Exception e) { - throw new Exception("Disposing the savepoint with path" + savepointPath + " failed.", e); + + for (URL jar : program.getAllLibraries()) { + try { + jobGraph.addJar(new Path(jar.toURI())); + } catch (URISyntaxException e) { + throw new RuntimeException("URL is invalid. This should not happen.", e); + } } + jobGraph.setClasspaths(program.getClasspaths()); + + logAndSysout("Uploading JAR files for savepoint disposal."); + JobClient.uploadJarFiles(jobGraph, jobManager, clientTimeout); --- End diff -- Maybe we could also refactor the current code so that the `JobGraph` no longer does the uploading of the jars to the blob server. Then we wouldn't have to construct a `JobGraph` at all. I also think that it shouldn't be the responsibility of the `JobGraph` to upload the user code jars but simply to store the file names. --- If your project is set
[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2083#discussion_r68580451 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) { } /** -* Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint} -* message to the job manager. +* Asks the JobManager to dispose a savepoint. +* +* There are two options for this: +* +* Either the job the savepoint belongs to is still running, in which +* case the user code class loader of the job is used. +* Or the job terminated, in which case the user JARs have to be +* uploaded before disposing the savepoint. +* */ - private int disposeSavepoint(SavepointOptions options, String savepointPath) { - try { - ActorGateway jobManager = getJobManagerGateway(options); - logAndSysout("Disposing savepoint '" + savepointPath + "'."); - Future response = jobManager.ask(new DisposeSavepoint(savepointPath), clientTimeout); + private int disposeSavepoint(SavepointOptions options) throws Throwable { + String savepointPath = Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path"); - Object result; - try { - logAndSysout("Waiting for response..."); - result = Await.result(response, clientTimeout); + JobID jobId = options.getJobId(); + String jarFile = options.getJarFilePath(); + + if (jobId != null && jarFile != null) { + throw new IllegalArgumentException("Cannot dispose savepoint without Job ID or JAR."); + } + + ActorGateway jobManager = getJobManagerGateway(options); + + final Future response; + if (jobId != null) { + // Dispose with class loader of running job + logAndSysout("Disposing savepoint at '" + savepointPath + "' of job " + jobId + " ."); + + response = jobManager.ask( + new JobManagerMessages.DisposeSavepoint(savepointPath, jobId), + clientTimeout); + } else if (jarFile != null) { + logAndSysout("Disposing savepoint at '" + savepointPath + "'."); + + // Dispose with uploaded user code loader + Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); + PackagedProgram program = new PackagedProgram( --- End diff -- I think we have to call `program.deleteExtractedLibraries()` at the end so that we clean up possibly extracted libraries. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2083#discussion_r68580226 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) { } /** -* Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint} -* message to the job manager. +* Asks the JobManager to dispose a savepoint. +* +* There are two options for this: +* +* Either the job the savepoint belongs to is still running, in which +* case the user code class loader of the job is used. +* Or the job terminated, in which case the user JARs have to be +* uploaded before disposing the savepoint. +* */ - private int disposeSavepoint(SavepointOptions options, String savepointPath) { - try { - ActorGateway jobManager = getJobManagerGateway(options); - logAndSysout("Disposing savepoint '" + savepointPath + "'."); - Future response = jobManager.ask(new DisposeSavepoint(savepointPath), clientTimeout); + private int disposeSavepoint(SavepointOptions options) throws Throwable { + String savepointPath = Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path"); - Object result; - try { - logAndSysout("Waiting for response..."); - result = Await.result(response, clientTimeout); + JobID jobId = options.getJobId(); + String jarFile = options.getJarFilePath(); + + if (jobId != null && jarFile != null) { + throw new IllegalArgumentException("Cannot dispose savepoint without Job ID or JAR."); + } + + ActorGateway jobManager = getJobManagerGateway(options); + + final Future response; + if (jobId != null) { + // Dispose with class loader of running job + logAndSysout("Disposing savepoint at '" + savepointPath + "' of job " + jobId + " ."); + + response = jobManager.ask( + new JobManagerMessages.DisposeSavepoint(savepointPath, jobId), + clientTimeout); + } else if (jarFile != null) { + logAndSysout("Disposing savepoint at '" + savepointPath + "'."); + + // Dispose with uploaded user code loader + Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); + PackagedProgram program = new PackagedProgram( + new File(jarFile), + options.getClasspaths(), + options.getEntryPointClass(), + options.getProgramArgs()); + FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, program, 1); + + JobGraph jobGraph; + if (flinkPlan instanceof StreamingPlan) { + jobGraph = ((StreamingPlan) flinkPlan).getJobGraph(); + } else { + JobGraphGenerator gen = new JobGraphGenerator(this.config); + jobGraph = gen.compileJobGraph((OptimizedPlan) flinkPlan); } - catch (Exception e) { - throw new Exception("Disposing the savepoint with path" + savepointPath + " failed.", e); + + for (URL jar : program.getAllLibraries()) { + try { + jobGraph.addJar(new Path(jar.toURI())); + } catch (URISyntaxException e) { + throw new RuntimeException("URL is invalid. This should not happen.", e); + } } + jobGraph.setClasspaths(program.getClasspaths()); + + logAndSysout("Uploading JAR files for savepoint disposal."); + JobClient.uploadJarFiles(jobGraph, jobManager, clientTimeout); + + response = jobManager.ask( + new JobManagerMessages.DisposeSavepointWithClassLoader( + savepointPath, + jobGraph.getUserJarBlobKeys(), +
[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2083#discussion_r68579929 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) { } /** -* Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint} -* message to the job manager. +* Asks the JobManager to dispose a savepoint. +* +* There are two options for this: +* +* Either the job the savepoint belongs to is still running, in which +* case the user code class loader of the job is used. +* Or the job terminated, in which case the user JARs have to be +* uploaded before disposing the savepoint. +* */ - private int disposeSavepoint(SavepointOptions options, String savepointPath) { - try { - ActorGateway jobManager = getJobManagerGateway(options); - logAndSysout("Disposing savepoint '" + savepointPath + "'."); - Future response = jobManager.ask(new DisposeSavepoint(savepointPath), clientTimeout); + private int disposeSavepoint(SavepointOptions options) throws Throwable { --- End diff -- Have we actually tested somewhere that the jars we're uploading to the JobManager allow us to dispose the given savepoint with user code? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2083#discussion_r68572931 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java --- @@ -305,6 +307,11 @@ public static void printHelpForSavepoint() { formatter.setSyntaxPrefix(" \"savepoint\" action options:"); formatter.printHelp(" ", getSavepointOptions(new Options())); System.out.println(); + System.out.println("\n Examples:"); + System.out.println(" - Trigger savepoint: bin/flink savepoint "); + System.out.println(" - Dispose savepoint:"); + System.out.println("* For a running job: bin/flink savepoint -d "); + System.out.println("* For a terminated job: bin/flink savepoint -d -j [-c -C ]"); --- End diff -- Is it consistent that we specify the job id without an option whereas we specify the jar with `-j`? Shouldn't only those parameters which are used everywhere be used without an option flag? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2083#discussion_r68571489 --- Diff: docs/apis/cli.md --- @@ -279,13 +289,27 @@ guarantees for a stop request. Action "savepoint" triggers savepoints for a running job or disposes existing ones. - Syntax: savepoint [OPTIONS] - "savepoint" action options: - -d,--disposeDisposes an existing savepoint. - -m,--jobmanager Address of the JobManager (master) to which --- End diff -- What happened to the jobmanager option? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...
GitHub user uce opened a pull request: https://github.com/apache/flink/pull/2083 [FLINK-3713] [clients, runtime] Use user code class loader when disposing savepoints Disposing savepoints via the JobManager fails for state handles or descriptors, which contain user classes (for example custom folding state or RocksDB handles). With this change, the user has to provide the job ID of a running job when disposing a savepoint in order to use the user code class loader of that job or provide the job JARs. This version breaks the API as the CLI now requires either a JobID or a JAR. I think this is reasonable, because the current approach only works for a subset of the available state variants. We can port this back for 1.0.4 and make the JobID or JAR arguments optional. What do you think? I've tested this with a job running on RocksDB state both while the job was running and after it terminated. This was not working with the current 1.0.3 version. Ideally, we will get rid of the whole disposal business when we make savepoints properly self-contained. I'm going to open a JIRA issue with a proposal to do so soon. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 3713-dispose_savepoint Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2083.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2083 commit 1cdfe9b4df3584b3c3c48168cd3f17100dbebf4c Author: Ufuk CelebiDate: 2016-06-08T08:59:24Z [FLINK-3713] [clients, runtime] Use user code class loader when disposing savepoint Disposing savepoints via the JobManager fails for state handles or descriptors, which contain user classes (for example custom folding state or RocksDB handles). With this change, the user has to provide the job ID of a running job when disposing a savepoint in order to use the user code class loader of that job or provide the job JARs. This version breaks the API as the CLI now requires either a JobID or a JAR. I think this is reasonable, because the current approach only works for a subset of the available state variants. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---