[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...

2016-07-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2083


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...

2016-06-27 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2083#discussion_r68590084
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) {
}
 
/**
-* Sends a {@link 
org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
-* message to the job manager.
+* Asks the JobManager to dispose a savepoint.
+*
+* There are two options for this:
+* 
+* Either the job the savepoint belongs to is still running, in 
which
+* case the user code class loader of the job is used.
+* Or the job terminated, in which case the user JARs have to be
+* uploaded before disposing the savepoint.
+* 
 */
-   private int disposeSavepoint(SavepointOptions options, String 
savepointPath) {
-   try {
-   ActorGateway jobManager = getJobManagerGateway(options);
-   logAndSysout("Disposing savepoint '" + savepointPath + 
"'.");
-   Future response = jobManager.ask(new 
DisposeSavepoint(savepointPath), clientTimeout);
+   private int disposeSavepoint(SavepointOptions options) throws Throwable 
{
+   String savepointPath = 
Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path");
 
-   Object result;
-   try {
-   logAndSysout("Waiting for response...");
-   result = Await.result(response, clientTimeout);
+   JobID jobId = options.getJobId();
+   String jarFile = options.getJarFilePath();
+
+   if (jobId != null && jarFile != null) {
+   throw new IllegalArgumentException("Cannot dispose 
savepoint without Job ID or JAR.");
+   }
+
+   ActorGateway jobManager = getJobManagerGateway(options);
+
+   final Future response;
+   if (jobId != null) {
+   // Dispose with class loader of running job
+   logAndSysout("Disposing savepoint at '" + savepointPath 
+ "' of job " + jobId + " .");
+
+   response = jobManager.ask(
+   new 
JobManagerMessages.DisposeSavepoint(savepointPath, jobId),
+   clientTimeout);
+   } else if (jarFile != null) {
+   logAndSysout("Disposing savepoint at '" + savepointPath 
+ "'.");
+
+   // Dispose with uploaded user code loader
+   Optimizer compiler = new Optimizer(new 
DataStatistics(), new DefaultCostEstimator(), config);
+   PackagedProgram program = new PackagedProgram(
+   new File(jarFile),
+   options.getClasspaths(),
+   options.getEntryPointClass(),
+   options.getProgramArgs());
+   FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, 
program, 1);
+
+   JobGraph jobGraph;
+   if (flinkPlan instanceof StreamingPlan) {
+   jobGraph = ((StreamingPlan) 
flinkPlan).getJobGraph();
+   } else {
+   JobGraphGenerator gen = new 
JobGraphGenerator(this.config);
+   jobGraph = gen.compileJobGraph((OptimizedPlan) 
flinkPlan);
}
-   catch (Exception e) {
-   throw new Exception("Disposing the savepoint 
with path" + savepointPath + " failed.", e);
+
+   for (URL jar : program.getAllLibraries()) {
+   try {
+   jobGraph.addJar(new Path(jar.toURI()));
+   } catch (URISyntaxException e) {
+   throw new RuntimeException("URL is 
invalid. This should not happen.", e);
+   }
}
 
+   jobGraph.setClasspaths(program.getClasspaths());
+
+   logAndSysout("Uploading JAR files for savepoint 
disposal.");
+   JobClient.uploadJarFiles(jobGraph, jobManager, 
clientTimeout);
+
+   response = jobManager.ask(
+   new 
JobManagerMessages.DisposeSavepointWithClassLoader(
+   savepointPath,
+   
jobGraph.getUserJarBlobKeys(),
+   

[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...

2016-06-27 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2083#discussion_r68588080
  
--- Diff: docs/apis/cli.md ---
@@ -279,13 +289,27 @@ guarantees for a stop request.
 
 Action "savepoint" triggers savepoints for a running job or disposes 
existing ones.
 
-  Syntax: savepoint [OPTIONS] 
-  "savepoint" action options:
- -d,--disposeDisposes an existing savepoint.
- -m,--jobmanager Address of the JobManager (master) to 
which
--- End diff --

I think it should be still there but let me check again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...

2016-06-27 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2083#discussion_r68588625
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) {
}
 
/**
-* Sends a {@link 
org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
-* message to the job manager.
+* Asks the JobManager to dispose a savepoint.
+*
+* There are two options for this:
+* 
+* Either the job the savepoint belongs to is still running, in 
which
+* case the user code class loader of the job is used.
+* Or the job terminated, in which case the user JARs have to be
+* uploaded before disposing the savepoint.
+* 
 */
-   private int disposeSavepoint(SavepointOptions options, String 
savepointPath) {
-   try {
-   ActorGateway jobManager = getJobManagerGateway(options);
-   logAndSysout("Disposing savepoint '" + savepointPath + 
"'.");
-   Future response = jobManager.ask(new 
DisposeSavepoint(savepointPath), clientTimeout);
+   private int disposeSavepoint(SavepointOptions options) throws Throwable 
{
+   String savepointPath = 
Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path");
 
-   Object result;
-   try {
-   logAndSysout("Waiting for response...");
-   result = Await.result(response, clientTimeout);
+   JobID jobId = options.getJobId();
+   String jarFile = options.getJarFilePath();
+
+   if (jobId != null && jarFile != null) {
+   throw new IllegalArgumentException("Cannot dispose 
savepoint without Job ID or JAR.");
+   }
+
+   ActorGateway jobManager = getJobManagerGateway(options);
+
+   final Future response;
+   if (jobId != null) {
+   // Dispose with class loader of running job
+   logAndSysout("Disposing savepoint at '" + savepointPath 
+ "' of job " + jobId + " .");
+
+   response = jobManager.ask(
+   new 
JobManagerMessages.DisposeSavepoint(savepointPath, jobId),
+   clientTimeout);
+   } else if (jarFile != null) {
+   logAndSysout("Disposing savepoint at '" + savepointPath 
+ "'.");
+
+   // Dispose with uploaded user code loader
+   Optimizer compiler = new Optimizer(new 
DataStatistics(), new DefaultCostEstimator(), config);
+   PackagedProgram program = new PackagedProgram(
--- End diff --

Oh, good catch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...

2016-06-27 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2083#discussion_r68587921
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java 
---
@@ -305,6 +307,11 @@ public static void printHelpForSavepoint() {
formatter.setSyntaxPrefix("  \"savepoint\" action options:");
formatter.printHelp(" ", getSavepointOptions(new Options()));
System.out.println();
+   System.out.println("\n  Examples:");
+   System.out.println("  - Trigger savepoint: bin/flink savepoint 
");
+   System.out.println("  - Dispose savepoint:");
+   System.out.println("* For a running job: bin/flink 
savepoint -d  ");
+   System.out.println("* For a terminated job: bin/flink 
savepoint -d  -j  [-c  -C ]");
--- End diff --

True, that's inconsistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...

2016-06-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2083#discussion_r68581644
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) {
}
 
/**
-* Sends a {@link 
org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
-* message to the job manager.
+* Asks the JobManager to dispose a savepoint.
+*
+* There are two options for this:
+* 
+* Either the job the savepoint belongs to is still running, in 
which
+* case the user code class loader of the job is used.
+* Or the job terminated, in which case the user JARs have to be
+* uploaded before disposing the savepoint.
+* 
 */
-   private int disposeSavepoint(SavepointOptions options, String 
savepointPath) {
-   try {
-   ActorGateway jobManager = getJobManagerGateway(options);
-   logAndSysout("Disposing savepoint '" + savepointPath + 
"'.");
-   Future response = jobManager.ask(new 
DisposeSavepoint(savepointPath), clientTimeout);
+   private int disposeSavepoint(SavepointOptions options) throws Throwable 
{
+   String savepointPath = 
Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path");
 
-   Object result;
-   try {
-   logAndSysout("Waiting for response...");
-   result = Await.result(response, clientTimeout);
+   JobID jobId = options.getJobId();
+   String jarFile = options.getJarFilePath();
+
+   if (jobId != null && jarFile != null) {
+   throw new IllegalArgumentException("Cannot dispose 
savepoint without Job ID or JAR.");
+   }
+
+   ActorGateway jobManager = getJobManagerGateway(options);
+
+   final Future response;
+   if (jobId != null) {
+   // Dispose with class loader of running job
+   logAndSysout("Disposing savepoint at '" + savepointPath 
+ "' of job " + jobId + " .");
+
+   response = jobManager.ask(
+   new 
JobManagerMessages.DisposeSavepoint(savepointPath, jobId),
+   clientTimeout);
+   } else if (jarFile != null) {
+   logAndSysout("Disposing savepoint at '" + savepointPath 
+ "'.");
+
+   // Dispose with uploaded user code loader
+   Optimizer compiler = new Optimizer(new 
DataStatistics(), new DefaultCostEstimator(), config);
+   PackagedProgram program = new PackagedProgram(
+   new File(jarFile),
+   options.getClasspaths(),
+   options.getEntryPointClass(),
+   options.getProgramArgs());
+   FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, 
program, 1);
+
+   JobGraph jobGraph;
+   if (flinkPlan instanceof StreamingPlan) {
+   jobGraph = ((StreamingPlan) 
flinkPlan).getJobGraph();
+   } else {
+   JobGraphGenerator gen = new 
JobGraphGenerator(this.config);
+   jobGraph = gen.compileJobGraph((OptimizedPlan) 
flinkPlan);
}
-   catch (Exception e) {
-   throw new Exception("Disposing the savepoint 
with path" + savepointPath + " failed.", e);
+
+   for (URL jar : program.getAllLibraries()) {
+   try {
+   jobGraph.addJar(new Path(jar.toURI()));
+   } catch (URISyntaxException e) {
+   throw new RuntimeException("URL is 
invalid. This should not happen.", e);
+   }
}
 
+   jobGraph.setClasspaths(program.getClasspaths());
+
+   logAndSysout("Uploading JAR files for savepoint 
disposal.");
+   JobClient.uploadJarFiles(jobGraph, jobManager, 
clientTimeout);
--- End diff --

Maybe we could also refactor the current code so that the `JobGraph` no 
longer does the uploading of the jars to the blob server. Then we wouldn't have 
to construct a `JobGraph` at all. I also think that it shouldn't be the 
responsibility of the `JobGraph` to upload the user code jars but simply to 
store the file names.


---
If your project is set 

[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...

2016-06-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2083#discussion_r68580451
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) {
}
 
/**
-* Sends a {@link 
org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
-* message to the job manager.
+* Asks the JobManager to dispose a savepoint.
+*
+* There are two options for this:
+* 
+* Either the job the savepoint belongs to is still running, in 
which
+* case the user code class loader of the job is used.
+* Or the job terminated, in which case the user JARs have to be
+* uploaded before disposing the savepoint.
+* 
 */
-   private int disposeSavepoint(SavepointOptions options, String 
savepointPath) {
-   try {
-   ActorGateway jobManager = getJobManagerGateway(options);
-   logAndSysout("Disposing savepoint '" + savepointPath + 
"'.");
-   Future response = jobManager.ask(new 
DisposeSavepoint(savepointPath), clientTimeout);
+   private int disposeSavepoint(SavepointOptions options) throws Throwable 
{
+   String savepointPath = 
Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path");
 
-   Object result;
-   try {
-   logAndSysout("Waiting for response...");
-   result = Await.result(response, clientTimeout);
+   JobID jobId = options.getJobId();
+   String jarFile = options.getJarFilePath();
+
+   if (jobId != null && jarFile != null) {
+   throw new IllegalArgumentException("Cannot dispose 
savepoint without Job ID or JAR.");
+   }
+
+   ActorGateway jobManager = getJobManagerGateway(options);
+
+   final Future response;
+   if (jobId != null) {
+   // Dispose with class loader of running job
+   logAndSysout("Disposing savepoint at '" + savepointPath 
+ "' of job " + jobId + " .");
+
+   response = jobManager.ask(
+   new 
JobManagerMessages.DisposeSavepoint(savepointPath, jobId),
+   clientTimeout);
+   } else if (jarFile != null) {
+   logAndSysout("Disposing savepoint at '" + savepointPath 
+ "'.");
+
+   // Dispose with uploaded user code loader
+   Optimizer compiler = new Optimizer(new 
DataStatistics(), new DefaultCostEstimator(), config);
+   PackagedProgram program = new PackagedProgram(
--- End diff --

I think we have to call `program.deleteExtractedLibraries()` at the end so 
that we clean up possibly extracted libraries.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...

2016-06-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2083#discussion_r68580226
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) {
}
 
/**
-* Sends a {@link 
org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
-* message to the job manager.
+* Asks the JobManager to dispose a savepoint.
+*
+* There are two options for this:
+* 
+* Either the job the savepoint belongs to is still running, in 
which
+* case the user code class loader of the job is used.
+* Or the job terminated, in which case the user JARs have to be
+* uploaded before disposing the savepoint.
+* 
 */
-   private int disposeSavepoint(SavepointOptions options, String 
savepointPath) {
-   try {
-   ActorGateway jobManager = getJobManagerGateway(options);
-   logAndSysout("Disposing savepoint '" + savepointPath + 
"'.");
-   Future response = jobManager.ask(new 
DisposeSavepoint(savepointPath), clientTimeout);
+   private int disposeSavepoint(SavepointOptions options) throws Throwable 
{
+   String savepointPath = 
Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path");
 
-   Object result;
-   try {
-   logAndSysout("Waiting for response...");
-   result = Await.result(response, clientTimeout);
+   JobID jobId = options.getJobId();
+   String jarFile = options.getJarFilePath();
+
+   if (jobId != null && jarFile != null) {
+   throw new IllegalArgumentException("Cannot dispose 
savepoint without Job ID or JAR.");
+   }
+
+   ActorGateway jobManager = getJobManagerGateway(options);
+
+   final Future response;
+   if (jobId != null) {
+   // Dispose with class loader of running job
+   logAndSysout("Disposing savepoint at '" + savepointPath 
+ "' of job " + jobId + " .");
+
+   response = jobManager.ask(
+   new 
JobManagerMessages.DisposeSavepoint(savepointPath, jobId),
+   clientTimeout);
+   } else if (jarFile != null) {
+   logAndSysout("Disposing savepoint at '" + savepointPath 
+ "'.");
+
+   // Dispose with uploaded user code loader
+   Optimizer compiler = new Optimizer(new 
DataStatistics(), new DefaultCostEstimator(), config);
+   PackagedProgram program = new PackagedProgram(
+   new File(jarFile),
+   options.getClasspaths(),
+   options.getEntryPointClass(),
+   options.getProgramArgs());
+   FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, 
program, 1);
+
+   JobGraph jobGraph;
+   if (flinkPlan instanceof StreamingPlan) {
+   jobGraph = ((StreamingPlan) 
flinkPlan).getJobGraph();
+   } else {
+   JobGraphGenerator gen = new 
JobGraphGenerator(this.config);
+   jobGraph = gen.compileJobGraph((OptimizedPlan) 
flinkPlan);
}
-   catch (Exception e) {
-   throw new Exception("Disposing the savepoint 
with path" + savepointPath + " failed.", e);
+
+   for (URL jar : program.getAllLibraries()) {
+   try {
+   jobGraph.addJar(new Path(jar.toURI()));
+   } catch (URISyntaxException e) {
+   throw new RuntimeException("URL is 
invalid. This should not happen.", e);
+   }
}
 
+   jobGraph.setClasspaths(program.getClasspaths());
+
+   logAndSysout("Uploading JAR files for savepoint 
disposal.");
+   JobClient.uploadJarFiles(jobGraph, jobManager, 
clientTimeout);
+
+   response = jobManager.ask(
+   new 
JobManagerMessages.DisposeSavepointWithClassLoader(
+   savepointPath,
+   
jobGraph.getUserJarBlobKeys(),
+   

[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...

2016-06-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2083#discussion_r68579929
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) {
}
 
/**
-* Sends a {@link 
org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
-* message to the job manager.
+* Asks the JobManager to dispose a savepoint.
+*
+* There are two options for this:
+* 
+* Either the job the savepoint belongs to is still running, in 
which
+* case the user code class loader of the job is used.
+* Or the job terminated, in which case the user JARs have to be
+* uploaded before disposing the savepoint.
+* 
 */
-   private int disposeSavepoint(SavepointOptions options, String 
savepointPath) {
-   try {
-   ActorGateway jobManager = getJobManagerGateway(options);
-   logAndSysout("Disposing savepoint '" + savepointPath + 
"'.");
-   Future response = jobManager.ask(new 
DisposeSavepoint(savepointPath), clientTimeout);
+   private int disposeSavepoint(SavepointOptions options) throws Throwable 
{
--- End diff --

Have we actually tested somewhere that the jars we're uploading to the 
JobManager allow us to dispose the given savepoint with user code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...

2016-06-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2083#discussion_r68572931
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java 
---
@@ -305,6 +307,11 @@ public static void printHelpForSavepoint() {
formatter.setSyntaxPrefix("  \"savepoint\" action options:");
formatter.printHelp(" ", getSavepointOptions(new Options()));
System.out.println();
+   System.out.println("\n  Examples:");
+   System.out.println("  - Trigger savepoint: bin/flink savepoint 
");
+   System.out.println("  - Dispose savepoint:");
+   System.out.println("* For a running job: bin/flink 
savepoint -d  ");
+   System.out.println("* For a terminated job: bin/flink 
savepoint -d  -j  [-c  -C ]");
--- End diff --

Is it consistent that we specify the job id without an option whereas we 
specify the jar with `-j`? Shouldn't only those parameters which are used 
everywhere be used without an option flag?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...

2016-06-27 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2083#discussion_r68571489
  
--- Diff: docs/apis/cli.md ---
@@ -279,13 +289,27 @@ guarantees for a stop request.
 
 Action "savepoint" triggers savepoints for a running job or disposes 
existing ones.
 
-  Syntax: savepoint [OPTIONS] 
-  "savepoint" action options:
- -d,--disposeDisposes an existing savepoint.
- -m,--jobmanager Address of the JobManager (master) to 
which
--- End diff --

What happened to the jobmanager option?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2083: [FLINK-3713] [clients, runtime] Use user code clas...

2016-06-08 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/2083

[FLINK-3713] [clients, runtime] Use user code class loader when disposing 
savepoints

Disposing savepoints via the JobManager fails for state handles or 
descriptors, which contain user classes (for example custom folding state or 
RocksDB handles).

With this change, the user has to provide the job ID of a running job when 
disposing a savepoint in order to use the user code class loader of that job or 
provide the job JARs.

This version breaks the API as the CLI now requires either a JobID or a 
JAR. I think this is reasonable, because the current approach only works for a 
subset of the available state variants.

We can port this back for 1.0.4 and make the JobID or JAR arguments 
optional. What do you think?

I've tested this with a job running on RocksDB state both while the job was 
running and after it terminated. This was not working with the current 1.0.3 
version.

Ideally, we will get rid of the whole disposal business when we make 
savepoints properly self-contained. I'm going to open a JIRA issue with a 
proposal to do so soon.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 3713-dispose_savepoint

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2083.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2083


commit 1cdfe9b4df3584b3c3c48168cd3f17100dbebf4c
Author: Ufuk Celebi 
Date:   2016-06-08T08:59:24Z

[FLINK-3713] [clients, runtime] Use user code class loader when disposing 
savepoint

Disposing savepoints via the JobManager fails for state handles or 
descriptors,
which contain user classes (for example custom folding state or RocksDB 
handles).

With this change, the user has to provide the job ID of a running job when 
disposing
a savepoint in order to use the user code class loader of that job or 
provide the
job JARs.

This version breaks the API as the CLI now requires either a JobID or a 
JAR. I think
this is reasonable, because the current approach only works for a subset of 
the
available state variants.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---