Hi Li, Roman is right about Flink's behavior and what you can do about it. The idea behind its current behavior is the following: If Flink cannot recover a job, it is very hard for it to tell whether it is due to an intermittent problem or a permanent one. No matter how often you retry, you can always run into the situation that you give up too early. Since we believe that this would be a very surprising behavior because it effectively means that Flink can forget about jobs in case of a recovery, we decided that this situation requires the intervention of the user to resolve the situation. By enforcing the user to make a decision, we make this problem very explicit and require her to think about the situation. I hope this makes sense.
So in your case, what you have to do is to remove the relevant ZooKeeper zNode which contains the pointer to the submitted job graph file. That way, Flink will no longer try to recover this job. I do agree that this is a bit cumbersome and it could definitely help to offer a small tool to do this kind of cleanup task. Cheers, Till On Fri, Jun 11, 2021 at 8:24 AM Roman Khachatryan <ro...@apache.org> wrote: > Hi Li, > > If I understand correctly, you want the cluster to proceed recovery, > skipping some non-recoverable jobs (but still recover others). > The only way I can think of is to remove the corresponding nodes in > ZooKeeper which is not very safe. > > I'm pulling in Robert and Till who might know better. > > Regards, > Roman > > > On Thu, Jun 10, 2021 at 8:56 PM Li Peng <li.p...@doordash.com> wrote: > > > > Hi Roman, > > > > Is there a way to abandon job recovery after a few tries? By that I mean > that this problem was fixed by me restarting the cluster and not try to > recover a job. Is there some setting that emulates what I did, so I don't > need to do manual intervention if this happens again?? > > > > Thanks, > > Li > > > > On Thu, Jun 10, 2021 at 9:50 AM Roman Khachatryan <ro...@apache.org> > wrote: > >> > >> Hi Li, > >> > >> The missing file is a serialized job graph and the job recovery can't > >> proceed without it. > >> Unfortunately, the cluster can't proceed if one of the jobs can't > recover. > >> > >> Regards, > >> Roman > >> > >> On Thu, Jun 10, 2021 at 6:02 AM Li Peng <li.p...@doordash.com> wrote: > >> > > >> > Hey folks, we have a cluster with HA mode enabled, and recently after > doing a zookeeper restart, our Kafka cluster (Flink v. 1.11.3, Scala v. > 2.12) crashed and was stuck in a crash loop, with the following error: > >> > > >> > 2021-06-10 02:14:52.123 [cluster-io-thread-1] ERROR > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error > occurred in the cluster entrypoint. > >> > java.util.concurrent.CompletionException: > org.apache.flink.util.FlinkRuntimeException: Could not recover job with job > id 00000000000000000000000000000000. > >> > at > java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) > >> > at > java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) > >> > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) > >> > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > >> > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > >> > at java.base/java.lang.Thread.run(Thread.java:834) > >> > Caused by: org.apache.flink.util.FlinkRuntimeException: Could not > recover job with job id 00000000000000000000000000000000. > >> > at > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:149) > >> > at > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:125) > >> > at > org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:200) > >> > at > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:115) > >> > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) > >> > ... 3 common frames omitted > >> > Caused by: org.apache.flink.util.FlinkException: Could not retrieve > submitted JobGraph from state handle under > /00000000000000000000000000000000. This indicates that the retrieved state > handle is broken. Try cleaning the state handle store. > >> > at > org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:192) > >> > at > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:146) > >> > ... 7 common frames omitted > >> > Caused by: java.io.FileNotFoundException: No such file or directory: > s3a://myfolder/recovery/myservice/2021-05-25T04:36:33Z/submittedJobGraph06ea8512c493 > >> > at > org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255) > >> > at > org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149) > >> > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088) > >> > at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699) > >> > at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950) > >> > at > org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:120) > >> > at > org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:37) > >> > at > org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127) > >> > at > org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69) > >> > at > org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:65) > >> > at > org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58) > >> > at > org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:186) > >> > ... 8 common frames omitted > >> > > >> > We have an idea of why the file might be gone and are addressing it, > but my question is: how can I configure this in such a way so that a > missing job file doesn't trap the cluster in a forever restart loop? Is > there some setting to just treat this like a complete fresh deployment if > the recovery file is missing? > >> > > >> > Thanks! > >> > Li > >> > > >> > > >> > > >> > >