Re: JobManager restarts on job failure
That's a good point. I forgot about these options. You're right. Cleanup wouldn't be done in that case. So, upgrading would be a viable option as you suggested. Matthias On Mon, Sep 26, 2022 at 12:53 PM Gyula Fóra wrote: > Maybe it is a stupid question but in Flink 1.15 with the following configs > enabled: > > SHUTDOWN_ON_APPLICATION_FINISH = false > SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR = true > > I think jobmanager pod would not restart but simply go to a terminal > failed state right? > > Gyula > > On Mon, Sep 26, 2022 at 12:31 PM Matthias Pohl > wrote: > >> Thanks Evgeniy for reaching out to the community and Gyula for picking it >> up. I haven't looked into the k8s operator in much detail, yet. So, help me >> out if I miss something here. But I'm afraid that this is not something >> that would be fixed by upgrading to 1.15. >> The issue here is that we're recovering from an external checkpoint using >> the same job ID (the default one used for any Flink cluster in Application >> Mode) and the same cluster ID, if I understand correctly. Now, the job is >> failing during initialization. Currently, this causes a global cleanup [1]. >> All HA data including the checkpoints are going to be deleted. I created >> FLINK-29415 [2] to cover this. >> >> I'm wondering whether we could work around this problem by specifying a >> random job ID through PipelineOptionsInternal [3] in the Kubernetes >> Operator. But I haven't looked into all the consequences around that. And >> it feels wrong to make this configuration parameter publicly usable. >> >> Another option might be to use ExecutionMode.RECOVERY in case of an >> initialization failure when recovering from an external Checkpoint in >> Application Mode (like we do it for internal recovery already). >> >> I'm looking forward to your opinion. >> Matthias >> >> [1] >> https://github.com/apache/flink/blob/41ac1ba13679121f1ddf14b26a36f4f4a3cc73e4/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L663 >> [2] https://issues.apache.org/jira/browse/FLINK-29415 >> [3] >> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptionsInternal.java#L29 >> >> On Tue, Sep 20, 2022 at 3:45 PM Gyula Fóra wrote: >> >>> I see I think we have seen this issue with others before, in Flink 1.15 >>> it is solved by the newly introduced JobResultStore. The operator also >>> configures that automatically for 1.15 to avoid this. >>> >>> Gyula >>> >>> On Tue, Sep 20, 2022 at 3:27 PM Evgeniy Lyutikov >>> wrote: >>> >>>> Thanks for the answer. >>>> I think this is not about the operator issue, kubernetes deployment >>>> just restarts the fallen pod, restarted jobmanager without HA metadata >>>> starts the job itself from an empty state. >>>> >>>> I'm looking for a way to prevent it from exiting in case of an >>>> job error (we use application mode cluster). >>>> >>>> >>>> >>>> -- >>>> *От:* Gyula Fóra >>>> *Отправлено:* 20 сентября 2022 г. 19:49:37 >>>> *Кому:* Evgeniy Lyutikov >>>> *Копия:* user@flink.apache.org >>>> *Тема:* Re: JobManager restarts on job failure >>>> >>>> The best thing for you to do would be to upgrade to Flink 1.15 and the >>>> latest operator version. >>>> In Flink 1.15 we have the option to interact with the Flink jobmanager >>>> even after the job FAILED and the operator leverages this for a much more >>>> robust behaviour. >>>> >>>> In any case the operator should not ever start the job from an empty >>>> state (even if it FAILED), if you think that's happening could you please >>>> open a JIRA ticket with the accompanying JM and Operator logs? >>>> >>>> Thanks >>>> Gyula >>>> >>>> On Tue, Sep 20, 2022 at 1:00 PM Evgeniy Lyutikov >>>> wrote: >>>> >>>>> Hi, >>>>> We using flink 1.14.4 with flink kubernetes operator. >>>>> >>>>> Sometimes when updating a job, it fails on startup and flink removes >>>>> all HA metadata and exits the jobmanager. >>>>> >>>>> >>>>> 2022-09-14 14:54:44,534 INFO >>>>> org.apache.flink.runtime.checkpoint.C
Re: JobManager restarts on job failure
Maybe it is a stupid question but in Flink 1.15 with the following configs enabled: SHUTDOWN_ON_APPLICATION_FINISH = false SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR = true I think jobmanager pod would not restart but simply go to a terminal failed state right? Gyula On Mon, Sep 26, 2022 at 12:31 PM Matthias Pohl wrote: > Thanks Evgeniy for reaching out to the community and Gyula for picking it > up. I haven't looked into the k8s operator in much detail, yet. So, help me > out if I miss something here. But I'm afraid that this is not something > that would be fixed by upgrading to 1.15. > The issue here is that we're recovering from an external checkpoint using > the same job ID (the default one used for any Flink cluster in Application > Mode) and the same cluster ID, if I understand correctly. Now, the job is > failing during initialization. Currently, this causes a global cleanup [1]. > All HA data including the checkpoints are going to be deleted. I created > FLINK-29415 [2] to cover this. > > I'm wondering whether we could work around this problem by specifying a > random job ID through PipelineOptionsInternal [3] in the Kubernetes > Operator. But I haven't looked into all the consequences around that. And > it feels wrong to make this configuration parameter publicly usable. > > Another option might be to use ExecutionMode.RECOVERY in case of an > initialization failure when recovering from an external Checkpoint in > Application Mode (like we do it for internal recovery already). > > I'm looking forward to your opinion. > Matthias > > [1] > https://github.com/apache/flink/blob/41ac1ba13679121f1ddf14b26a36f4f4a3cc73e4/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L663 > [2] https://issues.apache.org/jira/browse/FLINK-29415 > [3] > https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptionsInternal.java#L29 > > On Tue, Sep 20, 2022 at 3:45 PM Gyula Fóra wrote: > >> I see I think we have seen this issue with others before, in Flink 1.15 >> it is solved by the newly introduced JobResultStore. The operator also >> configures that automatically for 1.15 to avoid this. >> >> Gyula >> >> On Tue, Sep 20, 2022 at 3:27 PM Evgeniy Lyutikov >> wrote: >> >>> Thanks for the answer. >>> I think this is not about the operator issue, kubernetes deployment just >>> restarts the fallen pod, restarted jobmanager without HA metadata >>> starts the job itself from an empty state. >>> >>> I'm looking for a way to prevent it from exiting in case of an job error >>> (we use application mode cluster). >>> >>> >>> >>> -- >>> *От:* Gyula Fóra >>> *Отправлено:* 20 сентября 2022 г. 19:49:37 >>> *Кому:* Evgeniy Lyutikov >>> *Копия:* user@flink.apache.org >>> *Тема:* Re: JobManager restarts on job failure >>> >>> The best thing for you to do would be to upgrade to Flink 1.15 and the >>> latest operator version. >>> In Flink 1.15 we have the option to interact with the Flink jobmanager >>> even after the job FAILED and the operator leverages this for a much more >>> robust behaviour. >>> >>> In any case the operator should not ever start the job from an empty >>> state (even if it FAILED), if you think that's happening could you please >>> open a JIRA ticket with the accompanying JM and Operator logs? >>> >>> Thanks >>> Gyula >>> >>> On Tue, Sep 20, 2022 at 1:00 PM Evgeniy Lyutikov >>> wrote: >>> >>>> Hi, >>>> We using flink 1.14.4 with flink kubernetes operator. >>>> >>>> Sometimes when updating a job, it fails on startup and flink removes >>>> all HA metadata and exits the jobmanager. >>>> >>>> >>>> 2022-09-14 14:54:44,534 INFO >>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring >>>> job from Checkpoint 30829 @ 1663167158684 >>>> for located at >>>> s3p://flink-checkpoints/k8s-checkpoint-job-name//chk-30829. >>>> 2022-09-14 14:54:44,638 INFO >>>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job >>>> reached terminal state FAILED. >>>> org.apache.flink.runtime.client.JobInitializationException:
Re: JobManager restarts on job failure
Thanks Evgeniy for reaching out to the community and Gyula for picking it up. I haven't looked into the k8s operator in much detail, yet. So, help me out if I miss something here. But I'm afraid that this is not something that would be fixed by upgrading to 1.15. The issue here is that we're recovering from an external checkpoint using the same job ID (the default one used for any Flink cluster in Application Mode) and the same cluster ID, if I understand correctly. Now, the job is failing during initialization. Currently, this causes a global cleanup [1]. All HA data including the checkpoints are going to be deleted. I created FLINK-29415 [2] to cover this. I'm wondering whether we could work around this problem by specifying a random job ID through PipelineOptionsInternal [3] in the Kubernetes Operator. But I haven't looked into all the consequences around that. And it feels wrong to make this configuration parameter publicly usable. Another option might be to use ExecutionMode.RECOVERY in case of an initialization failure when recovering from an external Checkpoint in Application Mode (like we do it for internal recovery already). I'm looking forward to your opinion. Matthias [1] https://github.com/apache/flink/blob/41ac1ba13679121f1ddf14b26a36f4f4a3cc73e4/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L663 [2] https://issues.apache.org/jira/browse/FLINK-29415 [3] https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptionsInternal.java#L29 On Tue, Sep 20, 2022 at 3:45 PM Gyula Fóra wrote: > I see I think we have seen this issue with others before, in Flink 1.15 it > is solved by the newly introduced JobResultStore. The operator also > configures that automatically for 1.15 to avoid this. > > Gyula > > On Tue, Sep 20, 2022 at 3:27 PM Evgeniy Lyutikov > wrote: > >> Thanks for the answer. >> I think this is not about the operator issue, kubernetes deployment just >> restarts the fallen pod, restarted jobmanager without HA metadata starts >> the job itself from an empty state. >> >> I'm looking for a way to prevent it from exiting in case of an job error >> (we use application mode cluster). >> >> >> >> -- >> *От:* Gyula Fóra >> *Отправлено:* 20 сентября 2022 г. 19:49:37 >> *Кому:* Evgeniy Lyutikov >> *Копия:* user@flink.apache.org >> *Тема:* Re: JobManager restarts on job failure >> >> The best thing for you to do would be to upgrade to Flink 1.15 and the >> latest operator version. >> In Flink 1.15 we have the option to interact with the Flink jobmanager >> even after the job FAILED and the operator leverages this for a much more >> robust behaviour. >> >> In any case the operator should not ever start the job from an empty >> state (even if it FAILED), if you think that's happening could you please >> open a JIRA ticket with the accompanying JM and Operator logs? >> >> Thanks >> Gyula >> >> On Tue, Sep 20, 2022 at 1:00 PM Evgeniy Lyutikov >> wrote: >> >>> Hi, >>> We using flink 1.14.4 with flink kubernetes operator. >>> >>> Sometimes when updating a job, it fails on startup and flink removes all >>> HA metadata and exits the jobmanager. >>> >>> >>> 2022-09-14 14:54:44,534 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring >>> job from Checkpoint 30829 @ 1663167158684 >>> for located at >>> s3p://flink-checkpoints/k8s-checkpoint-job-name//chk-30829. >>> 2022-09-14 14:54:44,638 INFO >>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job >>> reached terminal state FAILED. >>> org.apache.flink.runtime.client.JobInitializationException: Could not >>> start the JobMaster. >>> Caused by: java.util.concurrent.CompletionException: >>> java.lang.IllegalStateException: There is no operator for the state >>> 4e1d9dde287c33a35e7970cbe64a40fe >>> 2022-09-14 14:54:44,930 ERROR >>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal >>> error occurred in the cluster entrypoint. >>> 2022-09-14 14:54:45,020 INFO >>> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - >>> Clean up the high availability data for job >>> . >>> 2022-09-14 14:54:45,020 INFO >>> org.apache.flink.runtime.entrypoint.Cl
Re: JobManager restarts on job failure
I see I think we have seen this issue with others before, in Flink 1.15 it is solved by the newly introduced JobResultStore. The operator also configures that automatically for 1.15 to avoid this. Gyula On Tue, Sep 20, 2022 at 3:27 PM Evgeniy Lyutikov wrote: > Thanks for the answer. > I think this is not about the operator issue, kubernetes deployment just > restarts the fallen pod, restarted jobmanager without HA metadata starts > the job itself from an empty state. > > I'm looking for a way to prevent it from exiting in case of an job error > (we use application mode cluster). > > > > -- > *От:* Gyula Fóra > *Отправлено:* 20 сентября 2022 г. 19:49:37 > *Кому:* Evgeniy Lyutikov > *Копия:* user@flink.apache.org > *Тема:* Re: JobManager restarts on job failure > > The best thing for you to do would be to upgrade to Flink 1.15 and the > latest operator version. > In Flink 1.15 we have the option to interact with the Flink jobmanager > even after the job FAILED and the operator leverages this for a much more > robust behaviour. > > In any case the operator should not ever start the job from an empty state > (even if it FAILED), if you think that's happening could you please open a > JIRA ticket with the accompanying JM and Operator logs? > > Thanks > Gyula > > On Tue, Sep 20, 2022 at 1:00 PM Evgeniy Lyutikov > wrote: > >> Hi, >> We using flink 1.14.4 with flink kubernetes operator. >> >> Sometimes when updating a job, it fails on startup and flink removes all >> HA metadata and exits the jobmanager. >> >> >> 2022-09-14 14:54:44,534 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring >> job from Checkpoint 30829 @ 1663167158684 >> for located at >> s3p://flink-checkpoints/k8s-checkpoint-job-name//chk-30829. >> 2022-09-14 14:54:44,638 INFO >> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job >> reached terminal state FAILED. >> org.apache.flink.runtime.client.JobInitializationException: Could not >> start the JobMaster. >> Caused by: java.util.concurrent.CompletionException: >> java.lang.IllegalStateException: There is no operator for the state >> 4e1d9dde287c33a35e7970cbe64a40fe >> 2022-09-14 14:54:44,930 ERROR >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal >> error occurred in the cluster entrypoint. >> 2022-09-14 14:54:45,020 INFO >> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - >> Clean up the high availability data for job >> . >> 2022-09-14 14:54:45,020 INFO >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting >> KubernetesApplicationClusterEntrypoint down with application status >> UNKNOWN. Diagnostics Cluster entrypoint has been closed externally.. >> 2022-09-14 14:54:45,026 INFO >> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting >> down rest endpoint. >> 2022-09-14 14:54:46,122 INFO >> akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Shutting >> down remote daemon. >> 2022-09-14 14:54:46,321 INFO >> akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Remoting >> shut down. >> >> Kubernetes restarts the pod jobmanager and the new instance, not finding >> the HA metadata, starts the job from an empty state. >> Is there some option to prevent jobmanager from exiting after an job FAILED >> state? >> >> >> * -- *“This message contains confidential >> information/commercial secret. If you are not the intended addressee of >> this message you may not copy, save, print or forward it to any third party >> and you are kindly requested to destroy this message and notify the sender >> thereof by email. >> Данное сообщение содержит конфиденциальную информацию/информацию, >> являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом >> данного сообщения, Вы не вправе копировать, сохранять, печатать или >> пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и >> уведомить об этом отправителя электронным письмом.” >> >
Re: JobManager restarts on job failure
Thanks for the answer. I think this is not about the operator issue, kubernetes deployment just restarts the fallen pod, restarted jobmanager without HA metadata starts the job itself from an empty state. I'm looking for a way to prevent it from exiting in case of an job error (we use application mode cluster). От: Gyula Fóra Отправлено: 20 сентября 2022 г. 19:49:37 Кому: Evgeniy Lyutikov Копия: user@flink.apache.org Тема: Re: JobManager restarts on job failure The best thing for you to do would be to upgrade to Flink 1.15 and the latest operator version. In Flink 1.15 we have the option to interact with the Flink jobmanager even after the job FAILED and the operator leverages this for a much more robust behaviour. In any case the operator should not ever start the job from an empty state (even if it FAILED), if you think that's happening could you please open a JIRA ticket with the accompanying JM and Operator logs? Thanks Gyula On Tue, Sep 20, 2022 at 1:00 PM Evgeniy Lyutikov mailto:eblyuti...@avito.ru>> wrote: Hi, We using flink 1.14.4 with flink kubernetes operator. Sometimes when updating a job, it fails on startup and flink removes all HA metadata and exits the jobmanager. 2022-09-14 14:54:44,534 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring job from Checkpoint 30829 @ 1663167158684 for located at s3p://flink-checkpoints/k8s-checkpoint-job-name//chk-30829. 2022-09-14 14:54:44,638 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job reached terminal state FAILED. org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: There is no operator for the state 4e1d9dde287c33a35e7970cbe64a40fe 2022-09-14 14:54:44,930 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal error occurred in the cluster entrypoint. 2022-09-14 14:54:45,020 INFO org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - Clean up the high availability data for job . 2022-09-14 14:54:45,020 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally.. 2022-09-14 14:54:45,026 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint. 2022-09-14 14:54:46,122 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Shutting down remote daemon. 2022-09-14 14:54:46,321 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Remoting shut down. Kubernetes restarts the pod jobmanager and the new instance, not finding the HA metadata, starts the job from an empty state. Is there some option to prevent jobmanager from exiting after an job FAILED state? “This message contains confidential information/commercial secret. If you are not the intended addressee of this message you may not copy, save, print or forward it to any third party and you are kindly requested to destroy this message and notify the sender thereof by email. Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и уведомить об этом отправителя электронным письмом.”
Re: JobManager restarts on job failure
The best thing for you to do would be to upgrade to Flink 1.15 and the latest operator version. In Flink 1.15 we have the option to interact with the Flink jobmanager even after the job FAILED and the operator leverages this for a much more robust behaviour. In any case the operator should not ever start the job from an empty state (even if it FAILED), if you think that's happening could you please open a JIRA ticket with the accompanying JM and Operator logs? Thanks Gyula On Tue, Sep 20, 2022 at 1:00 PM Evgeniy Lyutikov wrote: > Hi, > We using flink 1.14.4 with flink kubernetes operator. > > Sometimes when updating a job, it fails on startup and flink removes all > HA metadata and exits the jobmanager. > > > 2022-09-14 14:54:44,534 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring > job from Checkpoint 30829 @ 1663167158684 > for located at > s3p://flink-checkpoints/k8s-checkpoint-job-name//chk-30829. > 2022-09-14 14:54:44,638 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job > reached terminal state FAILED. > org.apache.flink.runtime.client.JobInitializationException: Could not > start the JobMaster. > Caused by: java.util.concurrent.CompletionException: > java.lang.IllegalStateException: There is no operator for the state > 4e1d9dde287c33a35e7970cbe64a40fe > 2022-09-14 14:54:44,930 ERROR > org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal > error occurred in the cluster entrypoint. > 2022-09-14 14:54:45,020 INFO > org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - > Clean up the high availability data for job > . > 2022-09-14 14:54:45,020 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting > KubernetesApplicationClusterEntrypoint down with application status > UNKNOWN. Diagnostics Cluster entrypoint has been closed externally.. > 2022-09-14 14:54:45,026 INFO > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting > down rest endpoint. > 2022-09-14 14:54:46,122 INFO > akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Shutting > down remote daemon. > 2022-09-14 14:54:46,321 INFO > akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Remoting > shut down. > > Kubernetes restarts the pod jobmanager and the new instance, not finding > the HA metadata, starts the job from an empty state. > Is there some option to prevent jobmanager from exiting after an job FAILED > state? > > > * -- *“This message contains confidential > information/commercial secret. If you are not the intended addressee of > this message you may not copy, save, print or forward it to any third party > and you are kindly requested to destroy this message and notify the sender > thereof by email. > Данное сообщение содержит конфиденциальную информацию/информацию, > являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом > данного сообщения, Вы не вправе копировать, сохранять, печатать или > пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и > уведомить об этом отправителя электронным письмом.” >
JobManager restarts on job failure
Hi, We using flink 1.14.4 with flink kubernetes operator. Sometimes when updating a job, it fails on startup and flink removes all HA metadata and exits the jobmanager. 2022-09-14 14:54:44,534 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring job from Checkpoint 30829 @ 1663167158684 for located at s3p://flink-checkpoints/k8s-checkpoint-job-name//chk-30829. 2022-09-14 14:54:44,638 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job reached terminal state FAILED. org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: There is no operator for the state 4e1d9dde287c33a35e7970cbe64a40fe 2022-09-14 14:54:44,930 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal error occurred in the cluster entrypoint. 2022-09-14 14:54:45,020 INFO org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - Clean up the high availability data for job . 2022-09-14 14:54:45,020 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally.. 2022-09-14 14:54:45,026 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint. 2022-09-14 14:54:46,122 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Shutting down remote daemon. 2022-09-14 14:54:46,321 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Remoting shut down. Kubernetes restarts the pod jobmanager and the new instance, not finding the HA metadata, starts the job from an empty state. Is there some option to prevent jobmanager from exiting after an job FAILED state? "This message contains confidential information/commercial secret. If you are not the intended addressee of this message you may not copy, save, print or forward it to any third party and you are kindly requested to destroy this message and notify the sender thereof by email. Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и уведомить об этом отправителя электронным письмом."