[ 
https://issues.apache.org/jira/browse/FLINK-24543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aitozi updated FLINK-24543:
---------------------------
    Summary: jar  (was: Zookeeper connection issue causes inconsistent state in 
Flink)

> jar
> ---
>
>                 Key: FLINK-24543
>                 URL: https://issues.apache.org/jira/browse/FLINK-24543
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.14.0, 1.13.3
>            Reporter: Jun Qin
>            Assignee: David Morávek
>            Priority: Blocker
>              Labels: pull-request-available
>
> Env: Flink 1.13.2 with Zookeeper HA
> Here is what happened:
> {code:bash}
> # checkpoint 1116 was triggered
> 2021-10-09 00:16:49,555 [Checkpoint Timer] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
> checkpoint 1116 (type=CHECKPOINT) @ 1633738609538 for job 
> a8a4fb85b681a897ba118db64333c9e5.
> # a few seconds later, zookeeper connection suspended, it turned out to be a 
> disk issue at zookeeper side caused slow fsync and commit)
> 2021-10-09 00:16:58,563 [Curator-ConnectionStateManager-0] WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.
> 2021-10-09 00:16:58,563 [Curator-ConnectionStateManager-0] WARN  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - 
> Connection to ZooKeeper suspended. The contender LeaderContender: 
> DefaultDispatcherRunner no longer participates in the leader election.
> # job was switching to suspended
> 2021-10-09 00:16:58,564 [flink-akka.actor.default-dispatcher-61] INFO  
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
> Disconnect job manager 
> b79b79fe513fb5f47e7bf447b7d94...@akka.tcp://flink@flink-...-jobmanager:50010/user/rpc/jobmanager_3
>  for job a8a4fb85b681a897ba118db64333c9e5 from the resource manager.
> 2021-10-09 00:16:58,565 [flink-akka.actor.default-dispatcher-92] INFO  
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
> Registering job manager 
> b79b79fe513fb5f47e7bf447b7d94...@akka.tcp://flink@flink-...-jobmanager:50010/user/rpc/jobmanager_3
>  for job a8a4fb85b681a897ba118db64333c9e5.
> 2021-10-09 00:16:58,565 [flink-akka.actor.default-dispatcher-90] INFO  
> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Stopping 
> the JobMaster for job Flink ...(a8a4fb85b681a897ba118db64333c9e5).
> 2021-10-09 00:16:58,567 [flink-akka.actor.default-dispatcher-90] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job Flink 
> ... (a8a4fb85b681a897ba118db64333c9e5) switched from state RUNNING to 
> SUSPENDED.
> 2021-10-09 00:16:58,570 [flink-akka.actor.default-dispatcher-86] INFO  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - 
> Closing 
> ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/a8a4fb85b681a897ba118db64333c9e5/job_manager_lock'}.
> 2021-10-09 00:16:58,667 [flink-akka.actor.default-dispatcher-92] INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job 
> a8a4fb85b681a897ba118db64333c9e5 reached terminal state SUSPENDED.
> # zookeeper connector restored
> 2021-10-09 00:17:08,225 [Curator-ConnectionStateManager-0] INFO  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - 
> Connection to ZooKeeper was reconnected. Leader election can be restarted.
> # received checkpoint acknowledgement, trying to finalize it, then failed to 
> add to zookeeper due to KeeperException$NodeExistsException
> 2021-10-09 00:17:14,382 [flink-akka.actor.default-dispatcher-90] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: ... 
> (1/5) (09d25852e3e206d6b7fe0d6bc965870f) switched from RUNNING to CANCELING.
> 2021-10-09 00:17:14,382 [jobmanager-future-thread-1] WARN  
> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Error while 
> processing AcknowledgeCheckpoint message
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
> the pending checkpoint 1116. Failure reason: Failure to finalize checkpoint.
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1227)
>  
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1072)
>  
>       at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
>  
>       at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>  
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>  [?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  [?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  [?:?]
>       at java.lang.Thread.run(Thread.java:834) [?:?]
> Caused by: 
> org.apache.flink.runtime.persistence.StateHandleStore$AlreadyExistException: 
> ZooKeeper node /0000000000000001116 already exists.
>       at 
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.lambda$addAndLock$0(ZooKeeperStateHandleStore.java:179)
>  
>       at java.util.Optional.map(Optional.java:265) ~[?:?]
>       at 
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.addAndLock(ZooKeeperStateHandleStore.java:177)
>  
>       at 
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.addCheckpoint(DefaultCompletedCheckpointStore.java:182)
>  
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1209)
>  
>       ... 9 more
> Caused by: 
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException$NodeExistsException:
>  KeeperErrorCode = NodeExists
>       at 
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException.create(KeeperException.java:122)
>  ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1]
>       at 
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper.multiInternal(ZooKeeper.java:1015)
>  ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1]
>       at 
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper.multi(ZooKeeper.java:919)
>  ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1]
>       at 
> org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorTransactionImpl.doOperation(CuratorTransactionImpl.java:197)
>  ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1]
>       at 
> org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorTransactionImpl.access$000(CuratorTransactionImpl.java:37)
>  ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1]
>       at 
> org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorTransactionImpl$2.call(CuratorTransactionImpl.java:130)
>  ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1]
>       at 
> org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorTransactionImpl$2.call(CuratorTransactionImpl.java:126)
>  ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1]
>       at 
> org.apache.flink.shaded.curator4.org.apache.curator.connection.StandardConnectionHandlingPolicy.callWithRetry(StandardConnectionHandlingPolicy.java:64)
>  ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1]
>       at 
> org.apache.flink.shaded.curator4.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:100)
>  ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1]
>       at 
> org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorTransactionImpl.commit(CuratorTransactionImpl.java:123)
>  ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1]
>       at 
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.writeStoreHandleTransactionally(ZooKeeperStateHandleStore.java:204)
>  
>       at 
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.addAndLock(ZooKeeperStateHandleStore.java:165)
>  
>       at 
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.addCheckpoint(DefaultCompletedCheckpointStore.java:182)
>  
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1209)
>  
> ... 9 more
> # checkpoint coordinator was stopping
> 2021-10-09 00:17:14,385 [flink-akka.actor.default-dispatcher-90] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Stopping 
> checkpoint coordinator for job a8a4fb85b681a897ba118db64333c9e5.
> 2021-10-09 00:17:14,401 [flink-akka.actor.default-dispatcher-90] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job 
> a8a4fb85b681a897ba118db64333c9e5 has been suspended.
> # clean up
> 2021-10-09 00:17:14,403 
> [AkkaRpcService-Supervisor-Termination-Future-Executor-thread-1] INFO  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - 
> Closing 
> ZooKeeperLeaderElectionDriver{leaderPath='/leader/a8a4fb85b681a897ba118db64333c9e5/job_manager_lock'}
> 2021-10-09 00:17:14,404 [cluster-io-thread-2] INFO  
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Released 
> job graph a8a4fb85b681a897ba118db64333c9e5 from 
> ZooKeeperStateHandleStore{namespace='flink/flink-.../jobgraphs'}.
> # however, during recovery, checkpoint 1116 was found in zookeeper, but the 
> metadata file /mnt/flink/.flink/ha/flink-.../completedCheckpoint42683d1121c7 
> was cleaned up due to the KeeperException$NodeExistsException happened before
> 2021-10-09 00:18:18,678 [jobmanager-future-thread-1] INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Recovering checkpoints from 
> ZooKeeperStateHandleStore{namespace='flink/flink-.../checkpoints/a8a4fb85b681a897ba118db64333c9e5'}.
> 2021-10-09 00:18:18,686 [jobmanager-future-thread-1] INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Found 4 checkpoints in 
> ZooKeeperStateHandleStore{namespace='flink/flink-.../checkpoints/a8a4fb85b681a897ba118db64333c9e5'}.
> 2021-10-09 00:18:18,686 [jobmanager-future-thread-1] INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Trying to fetch 4 checkpoints from storage.
> 2021-10-09 00:18:18,686 [jobmanager-future-thread-1] INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Trying to retrieve checkpoint 1113.
> 2021-10-09 00:18:18,689 [jobmanager-future-thread-1] INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Trying to retrieve checkpoint 1114.
> 2021-10-09 00:18:18,691 [jobmanager-future-thread-1] INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Trying to retrieve checkpoint 1115.
> 2021-10-09 00:18:18,693 [jobmanager-future-thread-1] INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Trying to retrieve checkpoint 1116.
> 2021-10-09 00:18:18,700 [flink-akka.actor.default-dispatcher-18] ERROR 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error 
> occurred in the cluster entrypoint.
> org.apache.flink.util.FlinkException: JobMaster for job 
> a8a4fb85b681a897ba118db64333c9e5 failed.
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873)
>  
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:459)
>  
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:436)
>  
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:415)
>  
>       at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) 
> ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
>  ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>  ~[?:?]
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
>  
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
>  
>       at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>  
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>  
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
>       at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
>       at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
>       at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
>       at akka.actor.Actor.aroundReceive(Actor.scala:517) 
>       at akka.actor.Actor.aroundReceive$(Actor.scala:515) 
>       at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
>       at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
>       at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
>       at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
>       at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
>       at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
>       at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  
> Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
> not start the JobMaster.
>       at 
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
>  
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)
>  ~[?:?]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>  ~[?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  ~[?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  ~[?:?]
>       at java.lang.Thread.run(Thread.java:834) ~[?:?]
> Caused by: java.util.concurrent.CompletionException: 
> java.lang.RuntimeException: org.apache.flink.util.FlinkException: Could not 
> retrieve checkpoint 1116 from state handle under /0000000000000001116. This 
> indicates that the retrieved state handle is broken. Try cleaning the state 
> handle store.
>       at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
>  ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
>  ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
>  ~[?:?]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>  ~[?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  ~[?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  ~[?:?]
>       at java.lang.Thread.run(Thread.java:834) ~[?:?]
> Caused by: java.lang.RuntimeException: org.apache.flink.util.FlinkException: 
> Could not retrieve checkpoint 1116 from state handle under 
> /0000000000000001116. This indicates that the retrieved state handle is 
> broken. Try cleaning the state handle store.
>       at 
> org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316) 
>       at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
>  
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
>  ~[?:?]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>  ~[?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  ~[?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  ~[?:?]
>       at java.lang.Thread.run(Thread.java:834) ~[?:?]
> Caused by: org.apache.flink.util.FlinkException: Could not retrieve 
> checkpoint 1116 from state handle under /0000000000000001116. This indicates 
> that the retrieved state handle is broken. Try cleaning the state handle 
> store.
>       at 
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.retrieveCompletedCheckpoint(DefaultCompletedCheckpointStore.java:309)
>  
>       at 
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.recover(DefaultCompletedCheckpointStore.java:151)
>  
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1513)
>  
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476)
>  
>       at 
> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134)
>  
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
>  
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190)
>  
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122)
>  
>       at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
>  
>       at 
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
>  
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
>  
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317) 
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
>  
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
>  
>       at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
>  
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
>  ~[?:?]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>  ~[?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  ~[?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  ~[?:?]
>       at java.lang.Thread.run(Thread.java:834) ~[?:?]
> Caused by: java.io.FileNotFoundException: 
> /mnt/flink/.flink/ha/flink-.../completedCheckpoint42683d1121c7 (No such file 
> or directory)
>       at java.io.FileInputStream.open0(Native Method) ~[?:?]
>       at java.io.FileInputStream.open(FileInputStream.java:219) ~[?:?]
>       at java.io.FileInputStream.<init>(FileInputStream.java:157) ~[?:?]
>       at 
> org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
>  
>       at 
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134) 
>       at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
>  
>       at 
> org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:66)
>  
>       at 
> org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
>  
>       at 
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.retrieveCompletedCheckpoint(DefaultCompletedCheckpointStore.java:298)
>  
>       at 
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.recover(DefaultCompletedCheckpointStore.java:151)
>  
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1513)
>  
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476)
>  
>       at 
> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134)
>  
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
>  
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190)
>  
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122)
>  
>       at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
>  
>       at 
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
>  
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
>  
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317) 
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
>  
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
>  
>       at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
>  
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
>  ~[?:?]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>  ~[?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  ~[?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  ~[?:?]
>       at java.lang.Thread.run(Thread.java:834) ~[?:?]
>  {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to