Hi Reo,

I do not think this is always guaranteed by Flink API.

The usual supported way is to:
- take a savepoint
- upgrade the cluster (JM and TM)
- maybe rebuild the job against the new Flink version
- start the job from the savepoint [1]

The externalised checkpoints also do not have to be always compatible
between the Flink versions.

In your particular case, one of the fields in ResourceSpec class has indeed
changed its type and its deserialisation has failed upon job recovery.
This mechanism assumes that nothing has changed: neither job nor Flink
version. It is for recovery from a failure.

Best,
Andrey

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/savepoints.html

On Wed, Mar 18, 2020 at 4:27 PM Reo Lei <leinuo...@gmail.com> wrote:

> Hi all,
>
> I encountered a problem when I upgrade flink from 1.9.1 to 1.10.0.
>
> At first, my job is running on flink stably which JM and TM is flink
> 1.9.1. And then I try to upgrade to 1.10.0.  I stop the  JM progress and
> start another JM progress.  At this time, the JM is 1.10.0 and the TM is
> 1.9.1, but the JM had not start successfully.
>
> I check the JM log and find the JM trying to recover the running jobs, but
> is fail. the error log as the follow.
>
> And I was wondering, is there have any way to upgrate the JM and TM but
> not cancel the running job?
>
>
> 2020-03-18 18:21:11.623 [main] INFO
>  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint   - Rest
> endpoint listening at 0.0.0.0:8880
> 2020-03-18 18:21:11.623 [main] INFO
>  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService -
> Starting ZooKeeperLeaderElectionService
> ZooKeeperLeaderElectionService{leaderPath='/leader/rest_server_lock'}.
> 2020-03-18 18:21:11.724 [main] INFO
>  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint   - Web
> frontend listening at http://0.0.0.0:8880.
> 2020-03-18 18:21:11.769 [main] INFO
>  org.apache.flink.runtime.rpc.akka.AkkaRpcService             - Starting
> RPC endpoint for
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at
> akka://flink/user/resourcemanager .
> 2020-03-18 18:21:11.836 [main] INFO
>  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService -
> Starting ZooKeeperLeaderElectionService
> ZooKeeperLeaderElectionService{leaderPath='/leader/dispatcher_lock'}.
> 2020-03-18 18:21:11.859 [main] INFO
>  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
> 2020-03-18 18:21:11.859 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService -
> Starting ZooKeeperLeaderElectionService
> ZooKeeperLeaderElectionService{leaderPath='/leader/resource_manager_lock'}.
> 2020-03-18 18:21:11.859 [main] INFO
>  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
> Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
> 2020-03-18 18:21:28.015 [main-EventThread] INFO
>  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint   -
> http://0.0.0.0:8880 was granted leadership with
> leaderSessionID=2120c8d2-db56-447e-8643-9c5739f26349
> 2020-03-18 18:21:28.016 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager -
> ResourceManager akka.tcp://flink@192.168.100.162:6123/user/resourcemanager
> was granted leadership with fencing token 813537a026af401a06441c182b8c453f
> 2020-03-18 18:21:28.020 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl -
> Starting the SlotManager.
> 2020-03-18 18:21:28.086 [main-EventThread] INFO
>  org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess
> - Start SessionDispatcherLeaderProcess.
> 2020-03-18 18:21:28.128 [cluster-io-thread-1] INFO
>  org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess
> - Recover all persisted job graphs.
> 2020-03-18 18:21:28.151 [flink-akka.actor.default-dispatcher-2] INFO
>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager -
> Registering TaskManager with ResourceID bccd9536c1483eff42f9e31c9106cf52
> (akka.tcp://flink@127.0.0.1:34120/user/taskmanager_0) at ResourceManager
> 2020-03-18 18:21:28.158 [cluster-io-thread-1] INFO
>  org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess
> - Trying to recover job with job id b6f7fd9c542e7570ba4c19e040ab946d.
> 2020-03-18 18:21:28.301 [cluster-io-thread-1] INFO
>  org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess
> - Stopping SessionDispatcherLeaderProcess.
> 2020-03-18 18:21:28.305 [cluster-io-thread-1] ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        - Fatal error
> occurred in the cluster entrypoint.
> java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkRuntimeException: Could not recover job with job
> id b6f7fd9c542e7570ba4c19e040ab946d.
>     at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>     at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>     at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkRuntimeException: Could not recover
> job with job id b6f7fd9c542e7570ba4c19e040ab946d.
>     at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:149)
>     at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:125)
>     at
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:184)
>     at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:115)
>     at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>     ... 3 common frames omitted
> Caused by: org.apache.flink.util.FlinkException: Could not retrieve
> submitted JobGraph from state handle under
> /b6f7fd9c542e7570ba4c19e040ab946d. This indicates that the retrieved state
> handle is broken. Try cleaning the state handle store.
>     at
> org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:191)
>     at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:146)
>     ... 7 common frames omitted
> Caused by: java.io.InvalidClassException:
> org.apache.flink.api.common.operators.ResourceSpec; incompatible types for
> field cpuCores
>     at java.io.ObjectStreamClass.matchFields(ObjectStreamClass.java:2399)
>     at java.io.ObjectStreamClass.getReflector(ObjectStreamClass.java:2293)
>     at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:741)
>     at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1876)
>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1745)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2033)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
>     at java.util.ArrayList.readObject(ArrayList.java:797)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
>     at java.util.ArrayList.readObject(ArrayList.java:797)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
>     at java.util.ArrayList.readObject(ArrayList.java:797)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
>     at java.util.ArrayList.readObject(ArrayList.java:797)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
>     at java.util.HashMap.readObject(HashMap.java:1409)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2158)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
>     at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>     at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:555)
>     at
> org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
>     at
> org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:185)
>     ... 8 common frames omitted
> 2020-03-18 18:21:28.322 [BlobServer shutdown hook] INFO
>  org.apache.flink.runtime.blob.BlobServer                     - Stopped
> BLOB server at 0.0.0.0:45287
>
>
>

Reply via email to