Hi Li,

If I understand correctly, you want the cluster to proceed recovery,
skipping some non-recoverable jobs (but still recover others).
The only way I can think of is to remove the corresponding nodes in
ZooKeeper which is not very safe.

I'm pulling in Robert and Till who might know better.

Regards,
Roman


On Thu, Jun 10, 2021 at 8:56 PM Li Peng <li.p...@doordash.com> wrote:
>
> Hi Roman,
>
> Is there a way to abandon job recovery after a few tries? By that I mean that 
> this problem was fixed by me restarting the cluster and not try to recover a 
> job. Is there some setting that emulates what I did, so I don't need to do 
> manual intervention if this happens again??
>
> Thanks,
> Li
>
> On Thu, Jun 10, 2021 at 9:50 AM Roman Khachatryan <ro...@apache.org> wrote:
>>
>> Hi Li,
>>
>> The missing file is a serialized job graph and the job recovery can't
>> proceed without it.
>> Unfortunately, the cluster can't proceed if one of the jobs can't recover.
>>
>> Regards,
>> Roman
>>
>> On Thu, Jun 10, 2021 at 6:02 AM Li Peng <li.p...@doordash.com> wrote:
>> >
>> > Hey folks, we have a cluster with HA mode enabled, and recently after 
>> > doing a zookeeper restart, our Kafka cluster (Flink v. 1.11.3, Scala v. 
>> > 2.12) crashed and was stuck in a crash loop, with the following error:
>> >
>> > 2021-06-10 02:14:52.123 [cluster-io-thread-1] ERROR 
>> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Fatal error 
>> > occurred in the cluster entrypoint.
>> > java.util.concurrent.CompletionException: 
>> > org.apache.flink.util.FlinkRuntimeException: Could not recover job with 
>> > job id 00000000000000000000000000000000.
>> > at 
>> > java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
>> > at 
>> > java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
>> > at 
>> > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
>> > at 
>> > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> > at 
>> > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> > at java.base/java.lang.Thread.run(Thread.java:834)
>> > Caused by: org.apache.flink.util.FlinkRuntimeException: Could not recover 
>> > job with job id 00000000000000000000000000000000.
>> > at 
>> > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:149)
>> > at 
>> > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:125)
>> > at 
>> > org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:200)
>> > at 
>> > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:115)
>> > at 
>> > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
>> > ... 3 common frames omitted
>> > Caused by: org.apache.flink.util.FlinkException: Could not retrieve 
>> > submitted JobGraph from state handle under 
>> > /00000000000000000000000000000000. This indicates that the retrieved state 
>> > handle is broken. Try cleaning the state handle store.
>> > at 
>> > org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:192)
>> > at 
>> > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:146)
>> > ... 7 common frames omitted
>> > Caused by: java.io.FileNotFoundException: No such file or directory: 
>> > s3a://myfolder/recovery/myservice/2021-05-25T04:36:33Z/submittedJobGraph06ea8512c493
>> > at 
>> > org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
>> > at 
>> > org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
>> > at 
>> > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
>> > at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
>> > at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
>> > at 
>> > org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:120)
>> > at 
>> > org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:37)
>> > at 
>> > org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
>> > at 
>> > org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
>> > at 
>> > org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:65)
>> > at 
>> > org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
>> > at 
>> > org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:186)
>> > ... 8 common frames omitted
>> >
>> > We have an idea of why the file might be gone and are addressing it, but 
>> > my question is: how can I configure this in such a way so that a missing 
>> > job file doesn't trap the cluster in a forever restart loop? Is there some 
>> > setting to just treat this like a complete fresh deployment if the 
>> > recovery file is missing?
>> >
>> > Thanks!
>> > Li
>> >
>> >
>> >
>> >

Reply via email to