[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15382418#comment-15382418 ] ASF GitHub Bot commented on FLINK-3713: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2083 > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger >Assignee: Ufuk Celebi > Fix For: 1.1.0 > > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at >
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372973#comment-15372973 ] ASF GitHub Bot commented on FLINK-3713: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2083 If there are no objections, I would like to merge this. > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger >Assignee: Ufuk Celebi > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15364246#comment-15364246 ] ASF GitHub Bot commented on FLINK-3713: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2083 I've addressed the comments: - Moved the JAR uploading to a helper: ```java // Retrieve blob server address and upload JARs to server uploadJarFiles(ActorGateway, FiniteDuration, List); // Upload JARs to server uploadJarFiles(InetSocketAddress, List); ``` I did not remove the `JobGraph` upload JARs method, but it now calls these helpers. I decided against removing it, because it's called in multiple places with the same pattern (upload, get BLOB keys, set BLOB keys). - I extract the libraries as you suggested (no need for the job graph now) - Removed the job ID variant as it was overloading disposal too much and just stuck to the JAR variant, which is now optional (no script API break) - State disposal errors are now propagated and the a failed disposal with `ClassNotFoundException` gives a hint to provide the JAR file - I've added an automated test with custom KV state in `ClassLoaderITCase` > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger >Assignee: Ufuk Celebi > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351186#comment-15351186 ] ASF GitHub Bot commented on FLINK-3713: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2083 Thanks for review. I will propagate the errors, make the job ID/JAR arguments optional, and try to simplify parts of the CLI as you suggested. Regarding including the RocksDB jar in dist, I think that should be handled as a separate issue. > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger >Assignee: Ufuk Celebi > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at >
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351180#comment-15351180 ] ASF GitHub Bot commented on FLINK-3713: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2083#discussion_r68590084 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) { } /** -* Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint} -* message to the job manager. +* Asks the JobManager to dispose a savepoint. +* +* There are two options for this: +* +* Either the job the savepoint belongs to is still running, in which +* case the user code class loader of the job is used. +* Or the job terminated, in which case the user JARs have to be +* uploaded before disposing the savepoint. +* */ - private int disposeSavepoint(SavepointOptions options, String savepointPath) { - try { - ActorGateway jobManager = getJobManagerGateway(options); - logAndSysout("Disposing savepoint '" + savepointPath + "'."); - Future response = jobManager.ask(new DisposeSavepoint(savepointPath), clientTimeout); + private int disposeSavepoint(SavepointOptions options) throws Throwable { + String savepointPath = Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path"); - Object result; - try { - logAndSysout("Waiting for response..."); - result = Await.result(response, clientTimeout); + JobID jobId = options.getJobId(); + String jarFile = options.getJarFilePath(); + + if (jobId != null && jarFile != null) { + throw new IllegalArgumentException("Cannot dispose savepoint without Job ID or JAR."); + } + + ActorGateway jobManager = getJobManagerGateway(options); + + final Future response; + if (jobId != null) { + // Dispose with class loader of running job + logAndSysout("Disposing savepoint at '" + savepointPath + "' of job " + jobId + " ."); + + response = jobManager.ask( + new JobManagerMessages.DisposeSavepoint(savepointPath, jobId), + clientTimeout); + } else if (jarFile != null) { + logAndSysout("Disposing savepoint at '" + savepointPath + "'."); + + // Dispose with uploaded user code loader + Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); + PackagedProgram program = new PackagedProgram( + new File(jarFile), + options.getClasspaths(), + options.getEntryPointClass(), + options.getProgramArgs()); + FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, program, 1); + + JobGraph jobGraph; + if (flinkPlan instanceof StreamingPlan) { + jobGraph = ((StreamingPlan) flinkPlan).getJobGraph(); + } else { + JobGraphGenerator gen = new JobGraphGenerator(this.config); + jobGraph = gen.compileJobGraph((OptimizedPlan) flinkPlan); } - catch (Exception e) { - throw new Exception("Disposing the savepoint with path" + savepointPath + " failed.", e); + + for (URL jar : program.getAllLibraries()) { + try { + jobGraph.addJar(new Path(jar.toURI())); + } catch (URISyntaxException e) { + throw new RuntimeException("URL is invalid. This should not happen.", e); + } } + jobGraph.setClasspaths(program.getClasspaths()); + + logAndSysout("Uploading JAR files for savepoint disposal."); + JobClient.uploadJarFiles(jobGraph, jobManager, clientTimeout); + + response = jobManager.ask( + new JobManagerMessages.DisposeSavepointWithClassLoader(
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351166#comment-15351166 ] ASF GitHub Bot commented on FLINK-3713: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2083#discussion_r68588625 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) { } /** -* Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint} -* message to the job manager. +* Asks the JobManager to dispose a savepoint. +* +* There are two options for this: +* +* Either the job the savepoint belongs to is still running, in which +* case the user code class loader of the job is used. +* Or the job terminated, in which case the user JARs have to be +* uploaded before disposing the savepoint. +* */ - private int disposeSavepoint(SavepointOptions options, String savepointPath) { - try { - ActorGateway jobManager = getJobManagerGateway(options); - logAndSysout("Disposing savepoint '" + savepointPath + "'."); - Future response = jobManager.ask(new DisposeSavepoint(savepointPath), clientTimeout); + private int disposeSavepoint(SavepointOptions options) throws Throwable { + String savepointPath = Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path"); - Object result; - try { - logAndSysout("Waiting for response..."); - result = Await.result(response, clientTimeout); + JobID jobId = options.getJobId(); + String jarFile = options.getJarFilePath(); + + if (jobId != null && jarFile != null) { + throw new IllegalArgumentException("Cannot dispose savepoint without Job ID or JAR."); + } + + ActorGateway jobManager = getJobManagerGateway(options); + + final Future response; + if (jobId != null) { + // Dispose with class loader of running job + logAndSysout("Disposing savepoint at '" + savepointPath + "' of job " + jobId + " ."); + + response = jobManager.ask( + new JobManagerMessages.DisposeSavepoint(savepointPath, jobId), + clientTimeout); + } else if (jarFile != null) { + logAndSysout("Disposing savepoint at '" + savepointPath + "'."); + + // Dispose with uploaded user code loader + Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); + PackagedProgram program = new PackagedProgram( --- End diff -- Oh, good catch! > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger >Assignee: Ufuk Celebi > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351161#comment-15351161 ] ASF GitHub Bot commented on FLINK-3713: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2083#discussion_r68588080 --- Diff: docs/apis/cli.md --- @@ -279,13 +289,27 @@ guarantees for a stop request. Action "savepoint" triggers savepoints for a running job or disposes existing ones. - Syntax: savepoint [OPTIONS] - "savepoint" action options: - -d,--disposeDisposes an existing savepoint. - -m,--jobmanager Address of the JobManager (master) to which --- End diff -- I think it should be still there but let me check again. > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger >Assignee: Ufuk Celebi > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351157#comment-15351157 ] ASF GitHub Bot commented on FLINK-3713: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2083#discussion_r68587921 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java --- @@ -305,6 +307,11 @@ public static void printHelpForSavepoint() { formatter.setSyntaxPrefix(" \"savepoint\" action options:"); formatter.printHelp(" ", getSavepointOptions(new Options())); System.out.println(); + System.out.println("\n Examples:"); + System.out.println(" - Trigger savepoint: bin/flink savepoint "); + System.out.println(" - Dispose savepoint:"); + System.out.println("* For a running job: bin/flink savepoint -d "); + System.out.println("* For a terminated job: bin/flink savepoint -d -j [-c -C ]"); --- End diff -- True, that's inconsistent. > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger >Assignee: Ufuk Celebi > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351075#comment-15351075 ] ASF GitHub Bot commented on FLINK-3713: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2083 Good work @uce :-) I agree that it currently it is a bit clumsy to discard savepoints of jobs which are no longer running. From the user perspective it should be as easy as possible. I also think that it would be a good idea to add the RocksDB jar to the flink-dist.jar since all serious user are using it. Furthermore, I think it would be a good idea to not only log possible exceptions in `SubtaskState` but to communicate it back to the user. Thus, letting them bubble up and handling them on the `JobManager` level would be the right way to go. I had some minor comments concerning test coverage and the way the job jars are constructed. > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger >Assignee: Ufuk Celebi > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351062#comment-15351062 ] ASF GitHub Bot commented on FLINK-3713: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2083#discussion_r68581644 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) { } /** -* Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint} -* message to the job manager. +* Asks the JobManager to dispose a savepoint. +* +* There are two options for this: +* +* Either the job the savepoint belongs to is still running, in which +* case the user code class loader of the job is used. +* Or the job terminated, in which case the user JARs have to be +* uploaded before disposing the savepoint. +* */ - private int disposeSavepoint(SavepointOptions options, String savepointPath) { - try { - ActorGateway jobManager = getJobManagerGateway(options); - logAndSysout("Disposing savepoint '" + savepointPath + "'."); - Future response = jobManager.ask(new DisposeSavepoint(savepointPath), clientTimeout); + private int disposeSavepoint(SavepointOptions options) throws Throwable { + String savepointPath = Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path"); - Object result; - try { - logAndSysout("Waiting for response..."); - result = Await.result(response, clientTimeout); + JobID jobId = options.getJobId(); + String jarFile = options.getJarFilePath(); + + if (jobId != null && jarFile != null) { + throw new IllegalArgumentException("Cannot dispose savepoint without Job ID or JAR."); + } + + ActorGateway jobManager = getJobManagerGateway(options); + + final Future response; + if (jobId != null) { + // Dispose with class loader of running job + logAndSysout("Disposing savepoint at '" + savepointPath + "' of job " + jobId + " ."); + + response = jobManager.ask( + new JobManagerMessages.DisposeSavepoint(savepointPath, jobId), + clientTimeout); + } else if (jarFile != null) { + logAndSysout("Disposing savepoint at '" + savepointPath + "'."); + + // Dispose with uploaded user code loader + Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); + PackagedProgram program = new PackagedProgram( + new File(jarFile), + options.getClasspaths(), + options.getEntryPointClass(), + options.getProgramArgs()); + FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, program, 1); + + JobGraph jobGraph; + if (flinkPlan instanceof StreamingPlan) { + jobGraph = ((StreamingPlan) flinkPlan).getJobGraph(); + } else { + JobGraphGenerator gen = new JobGraphGenerator(this.config); + jobGraph = gen.compileJobGraph((OptimizedPlan) flinkPlan); } - catch (Exception e) { - throw new Exception("Disposing the savepoint with path" + savepointPath + " failed.", e); + + for (URL jar : program.getAllLibraries()) { + try { + jobGraph.addJar(new Path(jar.toURI())); + } catch (URISyntaxException e) { + throw new RuntimeException("URL is invalid. This should not happen.", e); + } } + jobGraph.setClasspaths(program.getClasspaths()); + + logAndSysout("Uploading JAR files for savepoint disposal."); + JobClient.uploadJarFiles(jobGraph, jobManager, clientTimeout); --- End diff -- Maybe we could also refactor the current code so that the `JobGraph` no longer does the uploading of the jars to
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351046#comment-15351046 ] ASF GitHub Bot commented on FLINK-3713: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2083#discussion_r68580451 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) { } /** -* Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint} -* message to the job manager. +* Asks the JobManager to dispose a savepoint. +* +* There are two options for this: +* +* Either the job the savepoint belongs to is still running, in which +* case the user code class loader of the job is used. +* Or the job terminated, in which case the user JARs have to be +* uploaded before disposing the savepoint. +* */ - private int disposeSavepoint(SavepointOptions options, String savepointPath) { - try { - ActorGateway jobManager = getJobManagerGateway(options); - logAndSysout("Disposing savepoint '" + savepointPath + "'."); - Future response = jobManager.ask(new DisposeSavepoint(savepointPath), clientTimeout); + private int disposeSavepoint(SavepointOptions options) throws Throwable { + String savepointPath = Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path"); - Object result; - try { - logAndSysout("Waiting for response..."); - result = Await.result(response, clientTimeout); + JobID jobId = options.getJobId(); + String jarFile = options.getJarFilePath(); + + if (jobId != null && jarFile != null) { + throw new IllegalArgumentException("Cannot dispose savepoint without Job ID or JAR."); + } + + ActorGateway jobManager = getJobManagerGateway(options); + + final Future response; + if (jobId != null) { + // Dispose with class loader of running job + logAndSysout("Disposing savepoint at '" + savepointPath + "' of job " + jobId + " ."); + + response = jobManager.ask( + new JobManagerMessages.DisposeSavepoint(savepointPath, jobId), + clientTimeout); + } else if (jarFile != null) { + logAndSysout("Disposing savepoint at '" + savepointPath + "'."); + + // Dispose with uploaded user code loader + Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); + PackagedProgram program = new PackagedProgram( --- End diff -- I think we have to call `program.deleteExtractedLibraries()` at the end so that we clean up possibly extracted libraries. > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger >Assignee: Ufuk Celebi > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351043#comment-15351043 ] ASF GitHub Bot commented on FLINK-3713: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2083#discussion_r68580226 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) { } /** -* Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint} -* message to the job manager. +* Asks the JobManager to dispose a savepoint. +* +* There are two options for this: +* +* Either the job the savepoint belongs to is still running, in which +* case the user code class loader of the job is used. +* Or the job terminated, in which case the user JARs have to be +* uploaded before disposing the savepoint. +* */ - private int disposeSavepoint(SavepointOptions options, String savepointPath) { - try { - ActorGateway jobManager = getJobManagerGateway(options); - logAndSysout("Disposing savepoint '" + savepointPath + "'."); - Future response = jobManager.ask(new DisposeSavepoint(savepointPath), clientTimeout); + private int disposeSavepoint(SavepointOptions options) throws Throwable { + String savepointPath = Preconditions.checkNotNull(options.getSavepointPath(), "Savepoint path"); - Object result; - try { - logAndSysout("Waiting for response..."); - result = Await.result(response, clientTimeout); + JobID jobId = options.getJobId(); + String jarFile = options.getJarFilePath(); + + if (jobId != null && jarFile != null) { + throw new IllegalArgumentException("Cannot dispose savepoint without Job ID or JAR."); + } + + ActorGateway jobManager = getJobManagerGateway(options); + + final Future response; + if (jobId != null) { + // Dispose with class loader of running job + logAndSysout("Disposing savepoint at '" + savepointPath + "' of job " + jobId + " ."); + + response = jobManager.ask( + new JobManagerMessages.DisposeSavepoint(savepointPath, jobId), + clientTimeout); + } else if (jarFile != null) { + logAndSysout("Disposing savepoint at '" + savepointPath + "'."); + + // Dispose with uploaded user code loader + Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); + PackagedProgram program = new PackagedProgram( + new File(jarFile), + options.getClasspaths(), + options.getEntryPointClass(), + options.getProgramArgs()); + FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, program, 1); + + JobGraph jobGraph; + if (flinkPlan instanceof StreamingPlan) { + jobGraph = ((StreamingPlan) flinkPlan).getJobGraph(); + } else { + JobGraphGenerator gen = new JobGraphGenerator(this.config); + jobGraph = gen.compileJobGraph((OptimizedPlan) flinkPlan); } - catch (Exception e) { - throw new Exception("Disposing the savepoint with path" + savepointPath + " failed.", e); + + for (URL jar : program.getAllLibraries()) { + try { + jobGraph.addJar(new Path(jar.toURI())); + } catch (URISyntaxException e) { + throw new RuntimeException("URL is invalid. This should not happen.", e); + } } + jobGraph.setClasspaths(program.getClasspaths()); + + logAndSysout("Uploading JAR files for savepoint disposal."); + JobClient.uploadJarFiles(jobGraph, jobManager, clientTimeout); + + response = jobManager.ask( + new
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351039#comment-15351039 ] ASF GitHub Bot commented on FLINK-3713: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2083#discussion_r68579929 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -790,39 +767,95 @@ else if (result instanceof TriggerSavepointFailure) { } /** -* Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint} -* message to the job manager. +* Asks the JobManager to dispose a savepoint. +* +* There are two options for this: +* +* Either the job the savepoint belongs to is still running, in which +* case the user code class loader of the job is used. +* Or the job terminated, in which case the user JARs have to be +* uploaded before disposing the savepoint. +* */ - private int disposeSavepoint(SavepointOptions options, String savepointPath) { - try { - ActorGateway jobManager = getJobManagerGateway(options); - logAndSysout("Disposing savepoint '" + savepointPath + "'."); - Future response = jobManager.ask(new DisposeSavepoint(savepointPath), clientTimeout); + private int disposeSavepoint(SavepointOptions options) throws Throwable { --- End diff -- Have we actually tested somewhere that the jars we're uploading to the JobManager allow us to dispose the given savepoint with user code? > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger >Assignee: Ufuk Celebi > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) >
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15350959#comment-15350959 ] ASF GitHub Bot commented on FLINK-3713: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2083#discussion_r68572931 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java --- @@ -305,6 +307,11 @@ public static void printHelpForSavepoint() { formatter.setSyntaxPrefix(" \"savepoint\" action options:"); formatter.printHelp(" ", getSavepointOptions(new Options())); System.out.println(); + System.out.println("\n Examples:"); + System.out.println(" - Trigger savepoint: bin/flink savepoint "); + System.out.println(" - Dispose savepoint:"); + System.out.println("* For a running job: bin/flink savepoint -d "); + System.out.println("* For a terminated job: bin/flink savepoint -d -j [-c -C ]"); --- End diff -- Is it consistent that we specify the job id without an option whereas we specify the jar with `-j`? Shouldn't only those parameters which are used everywhere be used without an option flag? > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger >Assignee: Ufuk Celebi > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15350947#comment-15350947 ] ASF GitHub Bot commented on FLINK-3713: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2083#discussion_r68571489 --- Diff: docs/apis/cli.md --- @@ -279,13 +289,27 @@ guarantees for a stop request. Action "savepoint" triggers savepoints for a running job or disposes existing ones. - Syntax: savepoint [OPTIONS] - "savepoint" action options: - -d,--disposeDisposes an existing savepoint. - -m,--jobmanager Address of the JobManager (master) to which --- End diff -- What happened to the jobmanager option? > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger >Assignee: Ufuk Celebi > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325771#comment-15325771 ] ASF GitHub Bot commented on FLINK-3713: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2083 Full ack, but the problem is not just RocksDB, but also savepoints, which reference a user class in the state descriptor, including FS snapshots. So if you configure a file backend and use folding or reducing state, you run into the issue as well. What about the following: make it optional and fail with a proper error message in case of a missing user code class. SubtaskState (previously StateForTask), which fails to dispose the state snapshots currently only logs the Exception. We can change this to rethrow and then we give a proper error message on failed savepoint disposal. > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger >Assignee: Ufuk Celebi > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325398#comment-15325398 ] ASF GitHub Bot commented on FLINK-3713: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2083 It seems very clumsy to dispose savepoints like that. If that is needed for a fix currently, then I guess we have to live with that. Would make sense to make the jar and jobid optional, so that non-custom cases can dispose the savepoints in the simple way. Then, we could ad the RocksDB state backend to the jar in the flink distribution lib folder, so that it is always available and would not need jar or jobid for disposal. That would give the least user friction... > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger >Assignee: Ufuk Celebi > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15320670#comment-15320670 ] ASF GitHub Bot commented on FLINK-3713: --- GitHub user uce opened a pull request: https://github.com/apache/flink/pull/2083 [FLINK-3713] [clients, runtime] Use user code class loader when disposing savepoints Disposing savepoints via the JobManager fails for state handles or descriptors, which contain user classes (for example custom folding state or RocksDB handles). With this change, the user has to provide the job ID of a running job when disposing a savepoint in order to use the user code class loader of that job or provide the job JARs. This version breaks the API as the CLI now requires either a JobID or a JAR. I think this is reasonable, because the current approach only works for a subset of the available state variants. We can port this back for 1.0.4 and make the JobID or JAR arguments optional. What do you think? I've tested this with a job running on RocksDB state both while the job was running and after it terminated. This was not working with the current 1.0.3 version. Ideally, we will get rid of the whole disposal business when we make savepoints properly self-contained. I'm going to open a JIRA issue with a proposal to do so soon. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 3713-dispose_savepoint Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2083.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2083 commit 1cdfe9b4df3584b3c3c48168cd3f17100dbebf4c Author: Ufuk CelebiDate: 2016-06-08T08:59:24Z [FLINK-3713] [clients, runtime] Use user code class loader when disposing savepoint Disposing savepoints via the JobManager fails for state handles or descriptors, which contain user classes (for example custom folding state or RocksDB handles). With this change, the user has to provide the job ID of a running job when disposing a savepoint in order to use the user code class loader of that job or provide the job JARs. This version breaks the API as the CLI now requires either a JobID or a JAR. I think this is reasonable, because the current approach only works for a subset of the available state variants. > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger >Assignee: Ufuk Celebi > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15306429#comment-15306429 ] Ufuk Celebi commented on FLINK-3713: Sorry, I didn't yet have time to work on this. I hope to do it before the next release though. > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger >Assignee: Ufuk Celebi > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at >
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15277969#comment-15277969 ] Konstantin Knauf commented on FLINK-3713: - Great, thank you! > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger >Assignee: Ufuk Celebi > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) > at >
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15277835#comment-15277835 ] Ufuk Celebi commented on FLINK-3713: Not yet, but I will assign this to myself and look into it until next week. > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) >
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15277761#comment-15277761 ] Konstantin Knauf commented on FLINK-3713: - [~uce] Any progress on this? The workaround starts to get really annoying because of conflicting dependencies in different jobs... > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at >
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15233497#comment-15233497 ] Ufuk Celebi commented on FLINK-3713: OK, sorry for this inconvenience. I hope to fix this soon. I think I'll go with asking for the JAR during disposal. > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at >
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15232116#comment-15232116 ] Konstantin Knauf commented on FLINK-3713: - Works as expected. > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) > at >
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15230124#comment-15230124 ] Konstantin Knauf commented on FLINK-3713: - I will give that a try and report. > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) > at >
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15230103#comment-15230103 ] Ufuk Celebi commented on FLINK-3713: It applies for any custom state (I don't know for sure whether it did only apply to custom state backends when savepoints were introduced). Robert suggested as a work around to add the user jar to the classpath of Flink (e.g. by adding it to the lib folder of your installation). Can you try this as a work around? The easiest fix will be to require the user JAR when discarding savepoints. Sounds like something we need to add to 1.0.2 soon. > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15230058#comment-15230058 ] Konstantin Knauf commented on FLINK-3713: - Just for my understanding: It says there {quote} Disposing custom state handles: Disposing an old savepoint does not work with custom state handles (if you are using a custom state backend), because the user code class loader is not available during disposal. {quote} We are using FsStateBackend, not a custome state backend. Or does "custom state backend" always mean user classes? Let me give you some context, maybe we can come up with a different solution: We have a streaming job, which should always be running. Normally, flink should handle failures, but we can not solely rely on it (max restart attempts, some bug,..). So we have a script, which runs regularly and checks if the job is still running. If it is not running over a period of x minutes (to take account for automatic recovery by flink) the job is restarted again from a savepoint. Everytime this script runs a new savepoint is triggered, and the old one is discarded. So we need this regular savepoint to be able to do this "manual" recovery. So we are triggering quite a lot of savepoints and need some way of cleaning up the old ones. Do you have a different idea how to do this cleanup? Just deleting the directory in HDFS? If you have a complete different idea how to handle this supervision, I am also happy about suggestions. > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at >
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15230050#comment-15230050 ] Robert Metzger commented on FLINK-3713: --- One possible workaround for the issue is the following: You can put the job jar into the lib/ folder of flink. Then, the user code is available through the system classloader and Flink can find the class when discarding the state. > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at
[jira] [Commented] (FLINK-3713) DisposeSavepoint message uses system classloader to discard state
[ https://issues.apache.org/jira/browse/FLINK-3713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15230043#comment-15230043 ] Ufuk Celebi commented on FLINK-3713: This is unfortunately a known limitation (https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html#current-limitations), which I only thought of after I was "done" with the initial pull request preparations. How much of an headache is this for you right now? > DisposeSavepoint message uses system classloader to discard state > - > > Key: FLINK-3713 > URL: https://issues.apache.org/jira/browse/FLINK-3713 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger > > The {{DisposeSavepoint}} message in the JobManager is using the system > classloader to discard the state: > {code} > val savepoint = savepointStore.getState(savepointPath) > log.debug(s"$savepoint") > // Discard the associated checkpoint > savepoint.discard(getClass.getClassLoader) > // Dispose the savepoint > savepointStore.disposeState(savepointPath) > {code} > Which leads to issues when the state contains user classes: > {code} > 2016-04-07 03:02:12,225 INFO org.apache.flink.yarn.YarnJobManager > - Disposing savepoint at > 'hdfs:/bigdata/flink/savepoints/savepoint-7610540d089f'. > 2016-04-07 03:02:12,233 WARN > org.apache.flink.runtime.checkpoint.StateForTask - Failed to > discard checkpoint state: StateForTask eed5cc5a12dc2e0672848ba81bd8fa6d-0 : > SerializedValue > java.lang.ClassNotFoundException: > .MetricsProcessor$CombinedKeysFoldFunction > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at