With 1.12.1 it happened quite often, with 1.12.2 not that match, I think I saw 
it once or twice for ~20 cancels, when it happened, job actually restarted on 
cancel, did not grab log at that time, but chances good that I will able to 
reproduce.
Thanks,
Alexey

________________________________
From: Yang Wang <danrtsey...@gmail.com>
Sent: Sunday, March 14, 2021 7:50:21 PM
To: Alexey Trenikhun <yen...@msn.com>
Cc: Flink User Mail List <user@flink.apache.org>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) 
savepoint

If the HA related ConfigMaps still exists, then I am afraid the data located on 
the distributed storage should also exist.
So I suggest to delete the HA related storage as well.

Delete all the HA related data manually should help in your current situation. 
After then you could recover from the new savepoint.
However, I do not think this is a normal behavior. Since when the application 
reached the terminal state(e.g. FINISHED, FAILED, CANCELLED),
all HA related data should be cleaned up automatically.

Could you help to provide the JobManager logs when you are trying to cancel the 
job? I believe using `kubectl logs -f {pod_name}` could dump
the logs in real time.

Best,
Yang

Alexey Trenikhun <yen...@msn.com<mailto:yen...@msn.com>> 于2021年3月12日周五 
上午12:47写道:
Hi Yang,
Upgrade procedure uses POST /jobs/:jobid/savepoints with cancel-job=true, but 
perhaps I hit FLINK-21028. This lead to question, if normal via API 
take-savepoint-and-cancel-job fails, what steps should be done outside Flink to 
be able to resume from savepoint with new job version? Is deleting Kubernetes 
Job and HA configmaps enough, or something in persisted storage should be 
deleted as well?

Thanks,
Alexey
________________________________
From: Yang Wang <danrtsey...@gmail.com<mailto:danrtsey...@gmail.com>>
Sent: Thursday, March 11, 2021 2:59 AM
To: Alexey Trenikhun <yen...@msn.com<mailto:yen...@msn.com>>
Cc: Flink User Mail List <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) 
savepoint

Hi Alexey,

>From your attached logs, it seems that the leader related config map is reused.
Then the Flink application is recovered instead of submitting a new one. This is
the root cause it is trying to recover from a wrong savepoint which is 
specified in
your last submission.

> So how to fix this?
If you want to stop the application, I strongly suggest to cancel the flink job 
with savepoint
instead of directly deleting all the K8s resources. After then, you will find 
that the leader
related config maps will be deleted automatically after the job is cancelled.

Best,
Yang

Alexey Trenikhun <yen...@msn.com<mailto:yen...@msn.com>> 于2021年3月10日周三 
下午12:16写道:
Hi Yang,
The problem is re-occurred, full JM log is attached

Thanks,
Alexey
________________________________
From: Yang Wang <danrtsey...@gmail.com<mailto:danrtsey...@gmail.com>>
Sent: Sunday, February 28, 2021 10:04 PM
To: Alexey Trenikhun <yen...@msn.com<mailto:yen...@msn.com>>
Cc: Flink User Mail List <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) 
savepoint

Hi Alexey,

It seems that the KubernetesHAService works well since all the checkpoints have 
been cleaned up when the job is canceled.
And we could find related logs "Found 0 checkpoints in 
KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.".

However, it is a little strange that the CheckpointCoordinator is recovering 
from a wrong savepoint path. Could you share the
full JobManager logs? One possible reason I could guess is the application 
cluster entrypoint is not creating a new JobGraph from the specified arguments.


Best,
Yang

Alexey Trenikhun <yen...@msn.com<mailto:yen...@msn.com>> 于2021年2月27日周六 上午1:48写道:
Hello,
We have Flink job running in Kubernetes with Kuberenetes HA enabled (JM is 
deployed as Job, single TM as StatefulSet). We taken savepoint with 
cancel=true. Now when we are trying to start job using --fromSavepoint A, where 
is A path we got from taking savepoint (ClusterEntrypoint reports A in log), 
but looks like Job for some reason ignores given A and actually trying to 
restore from some path B (CheckpointCoordinator logs B ):

{"ts":"2021-02-26T17:09:52.500Z","message":" Program 
Arguments:","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    
--configDir","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    
/opt/flink/conf","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    
--fromSavepoint","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    
wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-000000-e8a201008f2c<http://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-000000-e8a201008f2c>","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    
--job-classname","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    
com.App","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    
--job-id","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.502Z","message":"    
00000000000000000000000000000000","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
...
{"ts":"2021-02-26T17:09:59.176Z","message":"Recovering checkpoints from 
KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.181Z","message":"Found 0 checkpoints in 
KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"All 0 checkpoints found are already 
downloaded.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"No checkpoint found during 
restore.","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"Starting job 
00000000000000000000000000000000 from savepoint 
wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-000000-fbcd58f66685<http://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-000000-fbcd58f66685>
 (allowing non restored 
state)","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.191Z","message":"0.0.7+g9bb29061\n  build 
2021-02-21T21:13:31-0800\n  tag: 0.0.0.7\n  id: 
0.0.0.7\n","logger_name":"com.Fsp","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.273Z","message":"Fatal error occurred in the cluster 
entrypoint.","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"flink-akka.actor.default-dispatcher-4","level":"ERROR","level_value":40000,"stack_trace":"org.apache.flink.util.FlinkException:
 JobMaster for job 00000000000000000000000000000000 failed.\n\tat 
org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:890)\n\tat
 
org.apache.flink.runtime.dispatcher.Dispatcher.dispatcherJobFailed(Dispatcher.java:465)\n\tat
 
org.apache.flink.runtime.dispatcher.Dispatcher.handleDispatcherJobResult(Dispatcher.java:444)\n\tat
 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:423)\n\tat
 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat
 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)\n\tat
 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)\n\tat
 akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat 
scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat 
akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat 
akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat 
akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat 
akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat 
akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat
 akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused
 by: org.apache.flink.runtime.client.JobInitializationException: Could not 
instantiate JobManager.\n\tat 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)\n\tat
 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\tat
 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
 java.lang.Thread.run(Thread.java:748)\nCaused by: 
java.io.FileNotFoundException: Cannot find checkpoint or savepoint 
file/directory 
'wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-000000-fbcd58f66685<http://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-000000-fbcd58f66685>'
 on file system 'wasbs'.\n\tat 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(AbstractFsCheckpointStorageAccess.java:250)\n\tat
 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpoint(AbstractFsCheckpointStorageAccess.java:111)\n\tat
 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1632)\n\tat
 
org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:358)\n\tat
 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:288)\n\tat
 
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:245)\n\tat
 
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:133)\n\tat
 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)\n\tat
 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)\n\tat
 org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:330)\n\tat 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)\n\tat
 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)\n\tat
 
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:162)\n\tat
 
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)\n\tat
 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)\n\t...
 4 common frames omitted\n"}

Any suggestions?

Thanks,
Alexey

Reply via email to