[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-07-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15382418#comment-15382418
 ] 

ASF GitHub Bot commented on FLINK-3713:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2083


> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>Assignee: Ufuk Celebi
> Fix For: 1.1.0
>
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:270)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at java.util.HashMap.readObject(HashMap.java:1184)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at 
> 

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-07-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372973#comment-15372973
 ] 

ASF GitHub Bot commented on FLINK-3713:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2083
  
If there are no objections, I would like to merge this.


> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>Assignee: Ufuk Celebi
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:270)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at java.util.HashMap.readObject(HashMap.java:1184)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-07-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15364246#comment-15364246
 ] 

ASF GitHub Bot commented on FLINK-3713:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2083
  
I've addressed the comments:
- Moved the JAR uploading to a helper:

```java
// Retrieve blob server address and upload JARs to server
uploadJarFiles(ActorGateway, FiniteDuration, List);
// Upload JARs to server 
uploadJarFiles(InetSocketAddress, List);
```
I did not remove the `JobGraph` upload JARs method, but it now calls 
these helpers. I decided against removing it, because it's called in multiple 
places with the same pattern (upload, get BLOB keys, set BLOB keys).
- I extract the libraries as you suggested (no need for the job graph now)
- Removed the job ID variant as it was overloading disposal too much and 
just stuck to the JAR variant, which is now optional (no script API break)
- State disposal errors are now propagated and the a failed disposal with 
`ClassNotFoundException` gives a hint to provide the JAR file
- I've added an automated test with custom KV state in `ClassLoaderITCase`




> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>Assignee: Ufuk Celebi
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:270)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at java.util.HashMap.readObject(HashMap.java:1184)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at 

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351186#comment-15351186
 ] 

ASF GitHub Bot commented on FLINK-3713:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2083
  
Thanks for review. I will propagate the errors, make the job ID/JAR 
arguments optional, and try to simplify parts of the CLI as you suggested.

Regarding including the RocksDB jar in dist, I think that should be handled 
as a separate issue.


> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>Assignee: Ufuk Celebi
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:270)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at java.util.HashMap.readObject(HashMap.java:1184)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> 

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351180#comment-15351180
 ] 

ASF GitHub Bot commented on FLINK-3713:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2083#discussion_r68590084
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) {
}
 
/**
-* Sends a {@link 
org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
-* message to the job manager.
+* Asks the JobManager to dispose a savepoint.
+*
+* There are two options for this:
+* 
+* Either the job the savepoint belongs to is still running, in 
which
+* case the user code class loader of the job is used.
+* Or the job terminated, in which case the user JARs have to be
+* uploaded before disposing the savepoint.
+* 
 */
-   private int disposeSavepoint(SavepointOptions options, String 
savepointPath) {
-   try {
-   ActorGateway jobManager = getJobManagerGateway(options);
-   logAndSysout("Disposing savepoint '" + savepointPath + 
"'.");
-   Future response = jobManager.ask(new 
DisposeSavepoint(savepointPath), clientTimeout);
+   private int disposeSavepoint(SavepointOptions options) throws Throwable 
{
+   String savepointPath = 
Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path");
 
-   Object result;
-   try {
-   logAndSysout("Waiting for response...");
-   result = Await.result(response, clientTimeout);
+   JobID jobId = options.getJobId();
+   String jarFile = options.getJarFilePath();
+
+   if (jobId != null && jarFile != null) {
+   throw new IllegalArgumentException("Cannot dispose 
savepoint without Job ID or JAR.");
+   }
+
+   ActorGateway jobManager = getJobManagerGateway(options);
+
+   final Future response;
+   if (jobId != null) {
+   // Dispose with class loader of running job
+   logAndSysout("Disposing savepoint at '" + savepointPath 
+ "' of job " + jobId + " .");
+
+   response = jobManager.ask(
+   new 
JobManagerMessages.DisposeSavepoint(savepointPath, jobId),
+   clientTimeout);
+   } else if (jarFile != null) {
+   logAndSysout("Disposing savepoint at '" + savepointPath 
+ "'.");
+
+   // Dispose with uploaded user code loader
+   Optimizer compiler = new Optimizer(new 
DataStatistics(), new DefaultCostEstimator(), config);
+   PackagedProgram program = new PackagedProgram(
+   new File(jarFile),
+   options.getClasspaths(),
+   options.getEntryPointClass(),
+   options.getProgramArgs());
+   FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, 
program, 1);
+
+   JobGraph jobGraph;
+   if (flinkPlan instanceof StreamingPlan) {
+   jobGraph = ((StreamingPlan) 
flinkPlan).getJobGraph();
+   } else {
+   JobGraphGenerator gen = new 
JobGraphGenerator(this.config);
+   jobGraph = gen.compileJobGraph((OptimizedPlan) 
flinkPlan);
}
-   catch (Exception e) {
-   throw new Exception("Disposing the savepoint 
with path" + savepointPath + " failed.", e);
+
+   for (URL jar : program.getAllLibraries()) {
+   try {
+   jobGraph.addJar(new Path(jar.toURI()));
+   } catch (URISyntaxException e) {
+   throw new RuntimeException("URL is 
invalid. This should not happen.", e);
+   }
}
 
+   jobGraph.setClasspaths(program.getClasspaths());
+
+   logAndSysout("Uploading JAR files for savepoint 
disposal.");
+   JobClient.uploadJarFiles(jobGraph, jobManager, 
clientTimeout);
+
+   response = jobManager.ask(
+   new 
JobManagerMessages.DisposeSavepointWithClassLoader(
 

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351166#comment-15351166
 ] 

ASF GitHub Bot commented on FLINK-3713:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2083#discussion_r68588625
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) {
}
 
/**
-* Sends a {@link 
org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
-* message to the job manager.
+* Asks the JobManager to dispose a savepoint.
+*
+* There are two options for this:
+* 
+* Either the job the savepoint belongs to is still running, in 
which
+* case the user code class loader of the job is used.
+* Or the job terminated, in which case the user JARs have to be
+* uploaded before disposing the savepoint.
+* 
 */
-   private int disposeSavepoint(SavepointOptions options, String 
savepointPath) {
-   try {
-   ActorGateway jobManager = getJobManagerGateway(options);
-   logAndSysout("Disposing savepoint '" + savepointPath + 
"'.");
-   Future response = jobManager.ask(new 
DisposeSavepoint(savepointPath), clientTimeout);
+   private int disposeSavepoint(SavepointOptions options) throws Throwable 
{
+   String savepointPath = 
Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path");
 
-   Object result;
-   try {
-   logAndSysout("Waiting for response...");
-   result = Await.result(response, clientTimeout);
+   JobID jobId = options.getJobId();
+   String jarFile = options.getJarFilePath();
+
+   if (jobId != null && jarFile != null) {
+   throw new IllegalArgumentException("Cannot dispose 
savepoint without Job ID or JAR.");
+   }
+
+   ActorGateway jobManager = getJobManagerGateway(options);
+
+   final Future response;
+   if (jobId != null) {
+   // Dispose with class loader of running job
+   logAndSysout("Disposing savepoint at '" + savepointPath 
+ "' of job " + jobId + " .");
+
+   response = jobManager.ask(
+   new 
JobManagerMessages.DisposeSavepoint(savepointPath, jobId),
+   clientTimeout);
+   } else if (jarFile != null) {
+   logAndSysout("Disposing savepoint at '" + savepointPath 
+ "'.");
+
+   // Dispose with uploaded user code loader
+   Optimizer compiler = new Optimizer(new 
DataStatistics(), new DefaultCostEstimator(), config);
+   PackagedProgram program = new PackagedProgram(
--- End diff --

Oh, good catch!


> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>Assignee: Ufuk Celebi
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at 

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351161#comment-15351161
 ] 

ASF GitHub Bot commented on FLINK-3713:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2083#discussion_r68588080
  
--- Diff: docs/apis/cli.md ---
@@ -279,13 +289,27 @@ guarantees for a stop request.
 
 Action "savepoint" triggers savepoints for a running job or disposes 
existing ones.
 
-  Syntax: savepoint [OPTIONS] 
-  "savepoint" action options:
- -d,--disposeDisposes an existing savepoint.
- -m,--jobmanager Address of the JobManager (master) to 
which
--- End diff --

I think it should be still there but let me check again.


> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>Assignee: Ufuk Celebi
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:270)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at java.util.HashMap.readObject(HashMap.java:1184)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>   at 

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351157#comment-15351157
 ] 

ASF GitHub Bot commented on FLINK-3713:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2083#discussion_r68587921
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java 
---
@@ -305,6 +307,11 @@ public static void printHelpForSavepoint() {
formatter.setSyntaxPrefix("  \"savepoint\" action options:");
formatter.printHelp(" ", getSavepointOptions(new Options()));
System.out.println();
+   System.out.println("\n  Examples:");
+   System.out.println("  - Trigger savepoint: bin/flink savepoint 
");
+   System.out.println("  - Dispose savepoint:");
+   System.out.println("* For a running job: bin/flink 
savepoint -d  ");
+   System.out.println("* For a terminated job: bin/flink 
savepoint -d  -j  [-c  -C ]");
--- End diff --

True, that's inconsistent.


> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>Assignee: Ufuk Celebi
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:270)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at java.util.HashMap.readObject(HashMap.java:1184)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351075#comment-15351075
 ] 

ASF GitHub Bot commented on FLINK-3713:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2083
  
Good work @uce :-)

I agree that it currently it is a bit clumsy to discard savepoints of jobs 
which are no longer running. From the user perspective it should be as easy as 
possible. 

I also think that it would be a good idea to add the RocksDB jar to the 
flink-dist.jar since all serious user are using it. Furthermore, I think it 
would be a good idea to not only log possible exceptions in `SubtaskState` but 
to communicate it back to the user. Thus, letting them bubble up and handling 
them on the `JobManager` level would be the right way to go.

I had some minor comments concerning test coverage and the way the job jars 
are constructed.


> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>Assignee: Ufuk Celebi
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:270)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at java.util.HashMap.readObject(HashMap.java:1184)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>  

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351062#comment-15351062
 ] 

ASF GitHub Bot commented on FLINK-3713:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2083#discussion_r68581644
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) {
}
 
/**
-* Sends a {@link 
org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
-* message to the job manager.
+* Asks the JobManager to dispose a savepoint.
+*
+* There are two options for this:
+* 
+* Either the job the savepoint belongs to is still running, in 
which
+* case the user code class loader of the job is used.
+* Or the job terminated, in which case the user JARs have to be
+* uploaded before disposing the savepoint.
+* 
 */
-   private int disposeSavepoint(SavepointOptions options, String 
savepointPath) {
-   try {
-   ActorGateway jobManager = getJobManagerGateway(options);
-   logAndSysout("Disposing savepoint '" + savepointPath + 
"'.");
-   Future response = jobManager.ask(new 
DisposeSavepoint(savepointPath), clientTimeout);
+   private int disposeSavepoint(SavepointOptions options) throws Throwable 
{
+   String savepointPath = 
Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path");
 
-   Object result;
-   try {
-   logAndSysout("Waiting for response...");
-   result = Await.result(response, clientTimeout);
+   JobID jobId = options.getJobId();
+   String jarFile = options.getJarFilePath();
+
+   if (jobId != null && jarFile != null) {
+   throw new IllegalArgumentException("Cannot dispose 
savepoint without Job ID or JAR.");
+   }
+
+   ActorGateway jobManager = getJobManagerGateway(options);
+
+   final Future response;
+   if (jobId != null) {
+   // Dispose with class loader of running job
+   logAndSysout("Disposing savepoint at '" + savepointPath 
+ "' of job " + jobId + " .");
+
+   response = jobManager.ask(
+   new 
JobManagerMessages.DisposeSavepoint(savepointPath, jobId),
+   clientTimeout);
+   } else if (jarFile != null) {
+   logAndSysout("Disposing savepoint at '" + savepointPath 
+ "'.");
+
+   // Dispose with uploaded user code loader
+   Optimizer compiler = new Optimizer(new 
DataStatistics(), new DefaultCostEstimator(), config);
+   PackagedProgram program = new PackagedProgram(
+   new File(jarFile),
+   options.getClasspaths(),
+   options.getEntryPointClass(),
+   options.getProgramArgs());
+   FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, 
program, 1);
+
+   JobGraph jobGraph;
+   if (flinkPlan instanceof StreamingPlan) {
+   jobGraph = ((StreamingPlan) 
flinkPlan).getJobGraph();
+   } else {
+   JobGraphGenerator gen = new 
JobGraphGenerator(this.config);
+   jobGraph = gen.compileJobGraph((OptimizedPlan) 
flinkPlan);
}
-   catch (Exception e) {
-   throw new Exception("Disposing the savepoint 
with path" + savepointPath + " failed.", e);
+
+   for (URL jar : program.getAllLibraries()) {
+   try {
+   jobGraph.addJar(new Path(jar.toURI()));
+   } catch (URISyntaxException e) {
+   throw new RuntimeException("URL is 
invalid. This should not happen.", e);
+   }
}
 
+   jobGraph.setClasspaths(program.getClasspaths());
+
+   logAndSysout("Uploading JAR files for savepoint 
disposal.");
+   JobClient.uploadJarFiles(jobGraph, jobManager, 
clientTimeout);
--- End diff --

Maybe we could also refactor the current code so that the `JobGraph` no 
longer does the uploading of the jars to 

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351046#comment-15351046
 ] 

ASF GitHub Bot commented on FLINK-3713:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2083#discussion_r68580451
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) {
}
 
/**
-* Sends a {@link 
org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
-* message to the job manager.
+* Asks the JobManager to dispose a savepoint.
+*
+* There are two options for this:
+* 
+* Either the job the savepoint belongs to is still running, in 
which
+* case the user code class loader of the job is used.
+* Or the job terminated, in which case the user JARs have to be
+* uploaded before disposing the savepoint.
+* 
 */
-   private int disposeSavepoint(SavepointOptions options, String 
savepointPath) {
-   try {
-   ActorGateway jobManager = getJobManagerGateway(options);
-   logAndSysout("Disposing savepoint '" + savepointPath + 
"'.");
-   Future response = jobManager.ask(new 
DisposeSavepoint(savepointPath), clientTimeout);
+   private int disposeSavepoint(SavepointOptions options) throws Throwable 
{
+   String savepointPath = 
Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path");
 
-   Object result;
-   try {
-   logAndSysout("Waiting for response...");
-   result = Await.result(response, clientTimeout);
+   JobID jobId = options.getJobId();
+   String jarFile = options.getJarFilePath();
+
+   if (jobId != null && jarFile != null) {
+   throw new IllegalArgumentException("Cannot dispose 
savepoint without Job ID or JAR.");
+   }
+
+   ActorGateway jobManager = getJobManagerGateway(options);
+
+   final Future response;
+   if (jobId != null) {
+   // Dispose with class loader of running job
+   logAndSysout("Disposing savepoint at '" + savepointPath 
+ "' of job " + jobId + " .");
+
+   response = jobManager.ask(
+   new 
JobManagerMessages.DisposeSavepoint(savepointPath, jobId),
+   clientTimeout);
+   } else if (jarFile != null) {
+   logAndSysout("Disposing savepoint at '" + savepointPath 
+ "'.");
+
+   // Dispose with uploaded user code loader
+   Optimizer compiler = new Optimizer(new 
DataStatistics(), new DefaultCostEstimator(), config);
+   PackagedProgram program = new PackagedProgram(
--- End diff --

I think we have to call `program.deleteExtractedLibraries()` at the end so 
that we clean up possibly extracted libraries.


> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>Assignee: Ufuk Celebi
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at 

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351043#comment-15351043
 ] 

ASF GitHub Bot commented on FLINK-3713:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2083#discussion_r68580226
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) {
}
 
/**
-* Sends a {@link 
org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
-* message to the job manager.
+* Asks the JobManager to dispose a savepoint.
+*
+* There are two options for this:
+* 
+* Either the job the savepoint belongs to is still running, in 
which
+* case the user code class loader of the job is used.
+* Or the job terminated, in which case the user JARs have to be
+* uploaded before disposing the savepoint.
+* 
 */
-   private int disposeSavepoint(SavepointOptions options, String 
savepointPath) {
-   try {
-   ActorGateway jobManager = getJobManagerGateway(options);
-   logAndSysout("Disposing savepoint '" + savepointPath + 
"'.");
-   Future response = jobManager.ask(new 
DisposeSavepoint(savepointPath), clientTimeout);
+   private int disposeSavepoint(SavepointOptions options) throws Throwable 
{
+   String savepointPath = 
Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path");
 
-   Object result;
-   try {
-   logAndSysout("Waiting for response...");
-   result = Await.result(response, clientTimeout);
+   JobID jobId = options.getJobId();
+   String jarFile = options.getJarFilePath();
+
+   if (jobId != null && jarFile != null) {
+   throw new IllegalArgumentException("Cannot dispose 
savepoint without Job ID or JAR.");
+   }
+
+   ActorGateway jobManager = getJobManagerGateway(options);
+
+   final Future response;
+   if (jobId != null) {
+   // Dispose with class loader of running job
+   logAndSysout("Disposing savepoint at '" + savepointPath 
+ "' of job " + jobId + " .");
+
+   response = jobManager.ask(
+   new 
JobManagerMessages.DisposeSavepoint(savepointPath, jobId),
+   clientTimeout);
+   } else if (jarFile != null) {
+   logAndSysout("Disposing savepoint at '" + savepointPath 
+ "'.");
+
+   // Dispose with uploaded user code loader
+   Optimizer compiler = new Optimizer(new 
DataStatistics(), new DefaultCostEstimator(), config);
+   PackagedProgram program = new PackagedProgram(
+   new File(jarFile),
+   options.getClasspaths(),
+   options.getEntryPointClass(),
+   options.getProgramArgs());
+   FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, 
program, 1);
+
+   JobGraph jobGraph;
+   if (flinkPlan instanceof StreamingPlan) {
+   jobGraph = ((StreamingPlan) 
flinkPlan).getJobGraph();
+   } else {
+   JobGraphGenerator gen = new 
JobGraphGenerator(this.config);
+   jobGraph = gen.compileJobGraph((OptimizedPlan) 
flinkPlan);
}
-   catch (Exception e) {
-   throw new Exception("Disposing the savepoint 
with path" + savepointPath + " failed.", e);
+
+   for (URL jar : program.getAllLibraries()) {
+   try {
+   jobGraph.addJar(new Path(jar.toURI()));
+   } catch (URISyntaxException e) {
+   throw new RuntimeException("URL is 
invalid. This should not happen.", e);
+   }
}
 
+   jobGraph.setClasspaths(program.getClasspaths());
+
+   logAndSysout("Uploading JAR files for savepoint 
disposal.");
+   JobClient.uploadJarFiles(jobGraph, jobManager, 
clientTimeout);
+
+   response = jobManager.ask(
+   new 

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351039#comment-15351039
 ] 

ASF GitHub Bot commented on FLINK-3713:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2083#discussion_r68579929
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) {
}
 
/**
-* Sends a {@link 
org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
-* message to the job manager.
+* Asks the JobManager to dispose a savepoint.
+*
+* There are two options for this:
+* 
+* Either the job the savepoint belongs to is still running, in 
which
+* case the user code class loader of the job is used.
+* Or the job terminated, in which case the user JARs have to be
+* uploaded before disposing the savepoint.
+* 
 */
-   private int disposeSavepoint(SavepointOptions options, String 
savepointPath) {
-   try {
-   ActorGateway jobManager = getJobManagerGateway(options);
-   logAndSysout("Disposing savepoint '" + savepointPath + 
"'.");
-   Future response = jobManager.ask(new 
DisposeSavepoint(savepointPath), clientTimeout);
+   private int disposeSavepoint(SavepointOptions options) throws Throwable 
{
--- End diff --

Have we actually tested somewhere that the jars we're uploading to the 
JobManager allow us to dispose the given savepoint with user code?


> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>Assignee: Ufuk Celebi
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:270)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at java.util.HashMap.readObject(HashMap.java:1184)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15350959#comment-15350959
 ] 

ASF GitHub Bot commented on FLINK-3713:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2083#discussion_r68572931
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java 
---
@@ -305,6 +307,11 @@ public static void printHelpForSavepoint() {
formatter.setSyntaxPrefix("  \"savepoint\" action options:");
formatter.printHelp(" ", getSavepointOptions(new Options()));
System.out.println();
+   System.out.println("\n  Examples:");
+   System.out.println("  - Trigger savepoint: bin/flink savepoint 
");
+   System.out.println("  - Dispose savepoint:");
+   System.out.println("* For a running job: bin/flink 
savepoint -d  ");
+   System.out.println("* For a terminated job: bin/flink 
savepoint -d  -j  [-c  -C ]");
--- End diff --

Is it consistent that we specify the job id without an option whereas we 
specify the jar with `-j`? Shouldn't only those parameters which are used 
everywhere be used without an option flag?


> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>Assignee: Ufuk Celebi
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:270)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at java.util.HashMap.readObject(HashMap.java:1184)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15350947#comment-15350947
 ] 

ASF GitHub Bot commented on FLINK-3713:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2083#discussion_r68571489
  
--- Diff: docs/apis/cli.md ---
@@ -279,13 +289,27 @@ guarantees for a stop request.
 
 Action "savepoint" triggers savepoints for a running job or disposes 
existing ones.
 
-  Syntax: savepoint [OPTIONS] 
-  "savepoint" action options:
- -d,--disposeDisposes an existing savepoint.
- -m,--jobmanager Address of the JobManager (master) to 
which
--- End diff --

What happened to the jobmanager option?


> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>Assignee: Ufuk Celebi
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:270)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at java.util.HashMap.readObject(HashMap.java:1184)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>   at 

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-06-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325771#comment-15325771
 ] 

ASF GitHub Bot commented on FLINK-3713:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2083
  
Full ack, but the problem is not just RocksDB, but also savepoints, which 
reference a user class in the state descriptor, including FS snapshots. So if 
you configure a file backend and use folding or reducing state, you run into 
the issue as well.

What about the following: make it optional and fail with a proper error 
message in case of a missing user code class. SubtaskState (previously 
StateForTask), which fails to dispose the state snapshots currently only logs 
the Exception. We can change this to rethrow and then we give a proper error 
message on failed savepoint disposal.


> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>Assignee: Ufuk Celebi
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:270)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at java.util.HashMap.readObject(HashMap.java:1184)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-06-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325398#comment-15325398
 ] 

ASF GitHub Bot commented on FLINK-3713:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2083
  
It seems very clumsy to dispose savepoints like that. If that is needed for 
a fix currently, then I guess we have to live with that.

Would make sense to make the jar and jobid optional, so that non-custom 
cases can dispose the savepoints in the simple way.
Then, we could ad the RocksDB state backend to the jar in the flink 
distribution lib folder, so that it is always available and would not need jar 
or jobid for disposal. That would give the least user friction...


> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>Assignee: Ufuk Celebi
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:270)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at java.util.HashMap.readObject(HashMap.java:1184)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>   at 

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-06-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15320670#comment-15320670
 ] 

ASF GitHub Bot commented on FLINK-3713:
---

GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/2083

[FLINK-3713] [clients, runtime] Use user code class loader when disposing 
savepoints

Disposing savepoints via the JobManager fails for state handles or 
descriptors, which contain user classes (for example custom folding state or 
RocksDB handles).

With this change, the user has to provide the job ID of a running job when 
disposing a savepoint in order to use the user code class loader of that job or 
provide the job JARs.

This version breaks the API as the CLI now requires either a JobID or a 
JAR. I think this is reasonable, because the current approach only works for a 
subset of the available state variants.

We can port this back for 1.0.4 and make the JobID or JAR arguments 
optional. What do you think?

I've tested this with a job running on RocksDB state both while the job was 
running and after it terminated. This was not working with the current 1.0.3 
version.

Ideally, we will get rid of the whole disposal business when we make 
savepoints properly self-contained. I'm going to open a JIRA issue with a 
proposal to do so soon.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 3713-dispose_savepoint

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2083.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2083


commit 1cdfe9b4df3584b3c3c48168cd3f17100dbebf4c
Author: Ufuk Celebi 
Date:   2016-06-08T08:59:24Z

[FLINK-3713] [clients, runtime] Use user code class loader when disposing 
savepoint

Disposing savepoints via the JobManager fails for state handles or 
descriptors,
which contain user classes (for example custom folding state or RocksDB 
handles).

With this change, the user has to provide the job ID of a running job when 
disposing
a savepoint in order to use the user code class loader of that job or 
provide the
job JARs.

This version breaks the API as the CLI now requires either a JobID or a 
JAR. I think
this is reasonable, because the current approach only works for a subset of 
the
available state variants.




> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>Assignee: Ufuk Celebi
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:270)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>  

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-05-30 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15306429#comment-15306429
 ] 

Ufuk Celebi commented on FLINK-3713:


Sorry, I didn't yet have time to work on this. I hope to do it before the next 
release though.

> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>Assignee: Ufuk Celebi
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:270)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at java.util.HashMap.readObject(HashMap.java:1184)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at 
> 

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-05-10 Thread Konstantin Knauf (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15277969#comment-15277969
 ] 

Konstantin Knauf commented on FLINK-3713:
-

Great, thank you!

> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>Assignee: Ufuk Celebi
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:270)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at java.util.HashMap.readObject(HashMap.java:1184)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
>   at 
> 

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-05-10 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15277835#comment-15277835
 ] 

Ufuk Celebi commented on FLINK-3713:


Not yet, but I will assign this to myself and look into it until next week.

> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:270)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at java.util.HashMap.readObject(HashMap.java:1184)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
>  

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-05-10 Thread Konstantin Knauf (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15277761#comment-15277761
 ] 

Konstantin Knauf commented on FLINK-3713:
-

[~uce] Any progress on this? The workaround starts to get really annoying 
because of conflicting dependencies in different jobs...

> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:270)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at java.util.HashMap.readObject(HashMap.java:1184)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at 
> 

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-04-09 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15233497#comment-15233497
 ] 

Ufuk Celebi commented on FLINK-3713:


OK, sorry for this inconvenience. I hope to fix this soon. I think I'll go with 
asking for the JAR during disposal.

> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:270)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at java.util.HashMap.readObject(HashMap.java:1184)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at 
> 

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-04-08 Thread Konstantin Knauf (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15232116#comment-15232116
 ] 

Konstantin Knauf commented on FLINK-3713:
-

Works as expected.

> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:270)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at java.util.HashMap.readObject(HashMap.java:1184)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
>   at 
> 

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-04-07 Thread Konstantin Knauf (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15230124#comment-15230124
 ] 

Konstantin Knauf commented on FLINK-3713:
-

I will give that a try and report.

> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:270)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at java.util.HashMap.readObject(HashMap.java:1184)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
>   at 
> 

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-04-07 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15230103#comment-15230103
 ] 

Ufuk Celebi commented on FLINK-3713:


It applies for any custom state (I don't know for sure whether it did only 
apply to custom state backends when savepoints were introduced).

Robert suggested as a work around to add the user jar to the classpath of Flink 
(e.g. by adding it to the lib folder of your installation). Can you try this as 
a work around?

The easiest fix will be to require the user JAR when discarding savepoints. 
Sounds like something we need to add to 1.0.2 soon.

> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:270)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at java.util.HashMap.readObject(HashMap.java:1184)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at 

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-04-07 Thread Konstantin Knauf (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15230058#comment-15230058
 ] 

Konstantin Knauf commented on FLINK-3713:
-

Just for my understanding: 
It says there
{quote} 
Disposing custom state handles: Disposing an old savepoint does not work with 
custom state handles (if you are using a custom state backend), because the 
user code class loader is not available during disposal.
{quote}
We are using FsStateBackend, not a custome state backend. Or does "custom state 
backend" always mean user classes?

Let me give you some context, maybe we can come up with a different solution: 

We have a streaming job, which should always be running. Normally, flink should 
handle failures, but we can not solely rely on it (max restart attempts, some 
bug,..). So we have a script, which runs regularly and checks if the job is 
still running. If it is not running over a period of x minutes (to take account 
for automatic recovery by flink) the job is restarted again from a savepoint. 
Everytime this script runs a new savepoint is triggered, and the old one is 
discarded. So we need this regular savepoint to be able to do this "manual" 
recovery. So we are triggering quite a lot of savepoints and need some way of 
cleaning up the old ones. Do you have a different idea how to do this cleanup? 
Just deleting the directory in HDFS? If you have a complete different idea how 
to handle this supervision, I am also happy about suggestions. 

> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:270)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at java.util.HashMap.readObject(HashMap.java:1184)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> 

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-04-07 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15230050#comment-15230050
 ] 

Robert Metzger commented on FLINK-3713:
---

One possible workaround for the issue is the following: You can put the job jar 
into the lib/ folder of flink. Then, the user code is available through the 
system classloader and Flink can find the class when discarding the state.

> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:270)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at java.util.HashMap.readObject(HashMap.java:1184)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 

[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state

2016-04-07 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15230043#comment-15230043
 ] 

Ufuk Celebi commented on FLINK-3713:


This is unfortunately a known limitation 
(https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html#current-limitations),
 which I only thought of after I was "done" with the initial pull request 
preparations.

How much of an headache is this for you right now?

> DisposeSavepoint message uses system classloader to discard state
> -
>
> Key: FLINK-3713
> URL: https://issues.apache.org/jira/browse/FLINK-3713
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Robert Metzger
>
> The {{DisposeSavepoint}} message in the JobManager is using the system 
> classloader to discard the state:
> {code}
> val savepoint = savepointStore.getState(savepointPath)
> log.debug(s"$savepoint")
> // Discard the associated checkpoint
> savepoint.discard(getClass.getClassLoader)
> // Dispose the savepoint
> savepointStore.disposeState(savepointPath)
> {code}
> Which leads to issues when the state contains user classes:
> {code}
> 2016-04-07 03:02:12,225 INFO  org.apache.flink.yarn.YarnJobManager
>   - Disposing savepoint at 
> 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'.
> 2016-04-07 03:02:12,233 WARN  
> org.apache.flink.runtime.checkpoint.StateForTask  - Failed to 
> discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : 
> SerializedValue
> java.lang.ClassNotFoundException: 
> .MetricsProcessor$CombinedKeysFoldFunction
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:270)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>   at java.util.HashMap.readObject(HashMap.java:1184)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>   at