Re: Issue with flink-kubernetes-operator not updating execution.savepoint.path after savepoint deletion
Hi Tony, It doesn’t seem like the operator had too much to do with this error , I wonder if this would still happen in newer Flink versions with the JobResultStore already available. It would be great to try. In any case I highly recommend upgrading to newer Flink versions for better operator integration and general stability. The next operator release (1.7.0) will drop support for Flink 1.13 and 1.14 as agreed by the community to only support the last 4 stable Flink minor versions . Cheers Gyula On Sat, 21 Oct 2023 at 20:49, Tony Chen wrote: > Hi Gyula, > > After upgrading our operator version to the HEAD commit of the release-1.6 > branch ( > https://github.com/apache/flink-kubernetes-operator/pkgs/container/flink-kubernetes-operator/127962962?tag=3f0dc2e), > we are still seeing this same issue. > > Here's the log message on the last savepoint (log timestamp is in UTC): > > 2023-10-21 10:21:14,023 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - >> Completed checkpoint 87794 for job ee4f7c678794ee16506f9b41425c244e >> (698450687 bytes, checkpointDuration=5601 ms, finalizationTime=296 ms). > > > 4 minutes later, ConnectException occurred, and the jobmanager attempts to > restart from the last savepoint first: > > 2023-10-21 10:25:30,725 WARN akka.remote.transport.netty.NettyTransport >> [] - Remote connection to [null] failed with >> java.net.ConnectException: Connection refused: /10.11.181.62:6122 >> 2023-10-21 10:25:30,726 WARN akka.remote.ReliableDeliverySupervisor >> [] - Association with remote system [akka.tcp:// >> flink@10.11.181.62:6122] has failed, address is now gated for [50] ms. >> Reason: [Association failed with [akka.tcp://flink@10.11.181.62:6122]] >> Caused by: [java.net.ConnectException: Connection refused: / >> 10.11.181.62:6122] >> 2023-10-21 10:25:37,935 WARN >> org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No >> hostname could be resolved for the IP address 10.11.202.152, using IP >> address as host name. Local input split assignment (such as for HDFS files) >> may be impacted. >> 2023-10-21 10:25:37,936 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job >> (ee4f7c678794ee16506f9b41425c244e) switched from state >> RESTARTING to RUNNING. >> 2023-10-21 10:25:37,936 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - >> Restoring job ee4f7c678794ee16506f9b41425c244e from Savepoint 87794 @ >> 1697883668126 for ee4f7c678794ee16506f9b41425c244e located at >> s3:///savepoint-ee4f7c-9c6499126fd0. >> 2023-10-21 10:25:37,937 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - No >> master state to restore > > > However, a RecipientUnreachableException occurs, and the HA data gets > cleaned up. Eventually, the Flink cluster shuts down and restarts: > > >> org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException: >> Could not send message >> [RemoteRpcInvocation(null.submitTask(TaskDeploymentDescriptor, JobMasterId, >> Time))] from sender [Actor[akka://flink/temp/taskmanager_0$ENE]] to >> recipient [Actor[akka.tcp:// >> flink@10.11.181.62:6122/user/rpc/taskmanager_0#-43671188]], because the >> recipient is unreachable. This can either mean that the recipient has been >> terminated or that the remote RpcService is currently not reachable. >> at >> org.apache.flink.runtime.rpc.akka.DeadLettersActor.handleDeadLetter(DeadLettersActor.java:61) >> ~[flink-rpc-akka_61fdae14-7548-48be-b7c8-11190d636910.jar:1.14.5] >> ... >> 2023-10-21 10:25:37,946 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - >> Discarding the results produced by task execution >> 86d39b748d3655b6488fb9eaafb34f73. >> ... >> 2023-10-21 10:25:40,063 INFO >> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - >> Finished cleaning up the high availability data. >> ... >> 2023-10-21 10:25:40,170 INFO >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - >> Terminating cluster entrypoint process >> KubernetesApplicationClusterEntrypoint with exit code 1443. >> ... >> 2023-10-21 10:25:44,631 INFO >> org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - >> Recovered 2 pods from previous attempts, current attempt id is 2. >> ... >> 2023-10-21 10:25:44,631 INFO >> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - >> Recovered 2 workers from previous attempt. >> ... >> 2023-10-21 10:25:45,015 ERROR >> org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler [] - >> Unhandled exception. >> org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException: >> Could not send message >> [RemoteFencedMessage(b55fb309bb698aa75925f70bce254756, >> RemoteRpcInvocation(null.requestMultipleJobDetails(Time)))] from sender >> [Actor[akka.tcp://flink@10.11.76.167:6123/temp/dispatcher_0$Tb]] to >> recipient [Actor[akka://flink/user/rpc/dispatcher_0#1755511719]],
Re: Issue with flink-kubernetes-operator not updating execution.savepoint.path after savepoint deletion
Hi Gyula, After upgrading our operator version to the HEAD commit of the release-1.6 branch ( https://github.com/apache/flink-kubernetes-operator/pkgs/container/flink-kubernetes-operator/127962962?tag=3f0dc2e), we are still seeing this same issue. Here's the log message on the last savepoint (log timestamp is in UTC): 2023-10-21 10:21:14,023 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - > Completed checkpoint 87794 for job ee4f7c678794ee16506f9b41425c244e > (698450687 bytes, checkpointDuration=5601 ms, finalizationTime=296 ms). 4 minutes later, ConnectException occurred, and the jobmanager attempts to restart from the last savepoint first: 2023-10-21 10:25:30,725 WARN akka.remote.transport.netty.NettyTransport > [] - Remote connection to [null] failed with > java.net.ConnectException: Connection refused: /10.11.181.62:6122 > 2023-10-21 10:25:30,726 WARN akka.remote.ReliableDeliverySupervisor > [] - Association with remote system [akka.tcp:// > flink@10.11.181.62:6122] has failed, address is now gated for [50] ms. > Reason: [Association failed with [akka.tcp://flink@10.11.181.62:6122]] > Caused by: [java.net.ConnectException: Connection refused: / > 10.11.181.62:6122] > 2023-10-21 10:25:37,935 WARN > org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No > hostname could be resolved for the IP address 10.11.202.152, using IP > address as host name. Local input split assignment (such as for HDFS files) > may be impacted. > 2023-10-21 10:25:37,936 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job > (ee4f7c678794ee16506f9b41425c244e) switched from state > RESTARTING to RUNNING. > 2023-10-21 10:25:37,936 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - > Restoring job ee4f7c678794ee16506f9b41425c244e from Savepoint 87794 @ > 1697883668126 for ee4f7c678794ee16506f9b41425c244e located at > s3:///savepoint-ee4f7c-9c6499126fd0. > 2023-10-21 10:25:37,937 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - No > master state to restore However, a RecipientUnreachableException occurs, and the HA data gets cleaned up. Eventually, the Flink cluster shuts down and restarts: > org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException: > Could not send message > [RemoteRpcInvocation(null.submitTask(TaskDeploymentDescriptor, JobMasterId, > Time))] from sender [Actor[akka://flink/temp/taskmanager_0$ENE]] to > recipient [Actor[akka.tcp:// > flink@10.11.181.62:6122/user/rpc/taskmanager_0#-43671188]], because the > recipient is unreachable. This can either mean that the recipient has been > terminated or that the remote RpcService is currently not reachable. > at > org.apache.flink.runtime.rpc.akka.DeadLettersActor.handleDeadLetter(DeadLettersActor.java:61) > ~[flink-rpc-akka_61fdae14-7548-48be-b7c8-11190d636910.jar:1.14.5] > ... > 2023-10-21 10:25:37,946 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > Discarding the results produced by task execution > 86d39b748d3655b6488fb9eaafb34f73. > ... > 2023-10-21 10:25:40,063 INFO > org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - > Finished cleaning up the high availability data. > ... > 2023-10-21 10:25:40,170 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - > Terminating cluster entrypoint process > KubernetesApplicationClusterEntrypoint with exit code 1443. > ... > 2023-10-21 10:25:44,631 INFO > org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - > Recovered 2 pods from previous attempts, current attempt id is 2. > ... > 2023-10-21 10:25:44,631 INFO > org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - > Recovered 2 workers from previous attempt. > ... > 2023-10-21 10:25:45,015 ERROR > org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler [] - > Unhandled exception. > org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException: > Could not send message > [RemoteFencedMessage(b55fb309bb698aa75925f70bce254756, > RemoteRpcInvocation(null.requestMultipleJobDetails(Time)))] from sender > [Actor[akka.tcp://flink@10.11.76.167:6123/temp/dispatcher_0$Tb]] to > recipient [Actor[akka://flink/user/rpc/dispatcher_0#1755511719]], because > the recipient is unreachable. This can either mean that the recipient has > been terminated or that the remote RpcService is currently not reachable. > ... > 2023-10-21 10:25:45,798 INFO > org.apache.flink.runtime.blob.FileSystemBlobStore[] - Creating > highly available BLOB storage directory at > s3:blob When the Flink cluster restarts, it doesn't try to restore from the latest savepoint anymore. Instead, it tries to restore from a savepoint in `execution.savepoint.path` in the flink-config. Since this savepoint was from a while ago, it has been disposed already, and so the Flink cluster cannot restart again: 2023-10-21
Re: Issue with flink-kubernetes-operator not updating execution.savepoint.path after savepoint deletion
Hi Operator savepoint retention and savepoint upgrades have nothing to do with each other I think. Retention is only for periodic savepoints triggered by the operator itself. I would upgrade to the latest 1.6.0 operator version before investigating further. Cheers Gyula On Sat, 23 Sep 2023 at 06:02, Nathan Moderwell < nathan.moderw...@robinhood.com> wrote: > Small update on this. I see that the issue is that we use `upgradeMode: > savepoint`, but have not configured the operator to retain savepoints for > long enough (the previous operator we used never deleted savepoints so we > didn't run into this). I am reconfiguring to use `upgradeMode: last-state` > and enabling HA to see if this provides us more stable job restoration on > pod disruption. > > On Fri, Sep 22, 2023 at 10:20 AM Nathan Moderwell < > nathan.moderw...@robinhood.com> wrote: > >> Hi flink-kubernetes-operator maintainers, >> >> We have recently migrated to the official operator and seeing a new issue >> where our FlinkDeployments can fail and crashloop looking for a >> non-existent savepoint. On further inspection, the job is attempting to >> restart from the savepoint specified in execution.savepoint.path. This >> config new for us (wasn't set by previous operator) is seems to be >> automatically set behind the scenes by the official operator. We see the >> savepoint in execution.savepoint.path existed but gets deleted after some >> amount of time (in the latest example, a few hours). Then when there is >> some pod disruption, the job attempts to restart from the savepoint (which >> was deleted) and starts crashlooping. >> >> Hoping you can help us troubleshoot and figure out if this can be solved >> through configuration (we are using equivalent configs from our previous >> operator where we did not have this issue). Adding some details on version >> and k8s state for your reference. Thank you for your support! >> >> Flink Version: 1.14.5 >> Flink Operator Version: 1.4.0 >> >> At the time of the issue, here is the flink-config we see in the >> configmap (the savepoint savepoint-bad5e5-6ab08cf0808e has been deleted >> from s3 at this point): >> >> kubernetes.jobmanager.replicas: 1 >> jobmanager.rpc.address: >> metrics.scope.task: >> flink.taskmanager.job..task..metric >> kubernetes.service-account: >> kubernetes.cluster-id: >> pipeline.auto-generate-uids: false >> metrics.scope.tm: flink.taskmanager.metric >> parallelism.default: 2 >> kubernetes.namespace: >> metrics.reporters: prom >> kubernetes.jobmanager.owner.reference: >> metrics.reporter.prom.port: 9090 >> taskmanager.memory.process.size: 10G >> kubernetes.internal.jobmanager.entrypoint.class: >> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint >> pipeline.name: >> execution.savepoint.path: s3:///savepoint-bad5e5-6ab08cf0808e >> kubernetes.pod-template-file: >> /tmp/flink_op_generated_podTemplate_12924532349572558288.yaml >> state.backend.rocksdb.localdir: /rocksdb/ >> kubernetes.pod-template-file.taskmanager: >> /tmp/flink_op_generated_podTemplate_1129545383743356980.yaml >> web.cancel.enable: false >> execution.checkpointing.timeout: 5 min >> kubernetes.container.image.pull-policy: IfNotPresent >> $internal.pipeline.job-id: bad5e5682b8f4fbefbf75b00d285ac10 >> kubernetes.jobmanager.cpu: 2.0 >> state.backend: filesystem >> $internal.flink.version: v1_14 >> kubernetes.pod-template-file.jobmanager: >> /tmp/flink_op_generated_podTemplate_824610597202468981.yaml >> blob.server.port: 6124 >> kubernetes.jobmanager.annotations: >> flinkdeployment.flink.apache.org/generation:14 >> metrics.scope.operator: >> flink.taskmanager.job..operator..metric >> state.savepoints.dir: s3:///savepoints >> kubernetes.taskmanager.cpu: 2.0 >> execution.savepoint.ignore-unclaimed-state: true >> $internal.application.program-args: >> kubernetes.container.image: >> taskmanager.numberOfTaskSlots: 1 >> metrics.scope.jm.job: flink.jobmanager.job..metric >> kubernetes.rest-service.exposed.type: ClusterIP >> metrics.reporter.prom.class: >> org.apache.flink.metrics.prometheus.PrometheusReporter >> $internal.application.main: >> metrics.scope.jm: flink.jobmanager.metric >> execution.target: kubernetes-application >> jobmanager.memory.process.size: 10G >> metrics.scope.tm.job: flink.taskmanager.job..metric >> taskmanager.rpc.port: 6122 >> internal.cluster.execution-mode: NORMAL >> execution.checkpointing.externalized-checkpoint-retention: >> RETAIN_ON_CANCELLATION >> pipeline.jars: local:///build/flink/usrlib/.jar >> state.checkpoints.dir: s3:///checkpoints >> >> At the time of the issue, here is our FlinkDeployment Spec: >> >> Spec: >> Flink Configuration: >> execution.checkpointing.timeout: 5 min >> kubernetes.operator.job.restart.failed: true >> kubernetes.operator.periodic.savepoint.interval: 600s >> metrics.reporter.prom.class: >> org.apache.flink.metrics.prometheus.PrometheusReporter >> metrics.reporter.prom.port:
Re: Issue with flink-kubernetes-operator not updating execution.savepoint.path after savepoint deletion
Small update on this. I see that the issue is that we use `upgradeMode: savepoint`, but have not configured the operator to retain savepoints for long enough (the previous operator we used never deleted savepoints so we didn't run into this). I am reconfiguring to use `upgradeMode: last-state` and enabling HA to see if this provides us more stable job restoration on pod disruption. On Fri, Sep 22, 2023 at 10:20 AM Nathan Moderwell < nathan.moderw...@robinhood.com> wrote: > Hi flink-kubernetes-operator maintainers, > > We have recently migrated to the official operator and seeing a new issue > where our FlinkDeployments can fail and crashloop looking for a > non-existent savepoint. On further inspection, the job is attempting to > restart from the savepoint specified in execution.savepoint.path. This > config new for us (wasn't set by previous operator) is seems to be > automatically set behind the scenes by the official operator. We see the > savepoint in execution.savepoint.path existed but gets deleted after some > amount of time (in the latest example, a few hours). Then when there is > some pod disruption, the job attempts to restart from the savepoint (which > was deleted) and starts crashlooping. > > Hoping you can help us troubleshoot and figure out if this can be solved > through configuration (we are using equivalent configs from our previous > operator where we did not have this issue). Adding some details on version > and k8s state for your reference. Thank you for your support! > > Flink Version: 1.14.5 > Flink Operator Version: 1.4.0 > > At the time of the issue, here is the flink-config we see in the configmap > (the savepoint savepoint-bad5e5-6ab08cf0808e has been deleted from s3 at > this point): > > kubernetes.jobmanager.replicas: 1 > jobmanager.rpc.address: > metrics.scope.task: > flink.taskmanager.job..task..metric > kubernetes.service-account: > kubernetes.cluster-id: > pipeline.auto-generate-uids: false > metrics.scope.tm: flink.taskmanager.metric > parallelism.default: 2 > kubernetes.namespace: > metrics.reporters: prom > kubernetes.jobmanager.owner.reference: > metrics.reporter.prom.port: 9090 > taskmanager.memory.process.size: 10G > kubernetes.internal.jobmanager.entrypoint.class: > org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint > pipeline.name: > execution.savepoint.path: s3:///savepoint-bad5e5-6ab08cf0808e > kubernetes.pod-template-file: > /tmp/flink_op_generated_podTemplate_12924532349572558288.yaml > state.backend.rocksdb.localdir: /rocksdb/ > kubernetes.pod-template-file.taskmanager: > /tmp/flink_op_generated_podTemplate_1129545383743356980.yaml > web.cancel.enable: false > execution.checkpointing.timeout: 5 min > kubernetes.container.image.pull-policy: IfNotPresent > $internal.pipeline.job-id: bad5e5682b8f4fbefbf75b00d285ac10 > kubernetes.jobmanager.cpu: 2.0 > state.backend: filesystem > $internal.flink.version: v1_14 > kubernetes.pod-template-file.jobmanager: > /tmp/flink_op_generated_podTemplate_824610597202468981.yaml > blob.server.port: 6124 > kubernetes.jobmanager.annotations: > flinkdeployment.flink.apache.org/generation:14 > metrics.scope.operator: > flink.taskmanager.job..operator..metric > state.savepoints.dir: s3:///savepoints > kubernetes.taskmanager.cpu: 2.0 > execution.savepoint.ignore-unclaimed-state: true > $internal.application.program-args: > kubernetes.container.image: > taskmanager.numberOfTaskSlots: 1 > metrics.scope.jm.job: flink.jobmanager.job..metric > kubernetes.rest-service.exposed.type: ClusterIP > metrics.reporter.prom.class: > org.apache.flink.metrics.prometheus.PrometheusReporter > $internal.application.main: > metrics.scope.jm: flink.jobmanager.metric > execution.target: kubernetes-application > jobmanager.memory.process.size: 10G > metrics.scope.tm.job: flink.taskmanager.job..metric > taskmanager.rpc.port: 6122 > internal.cluster.execution-mode: NORMAL > execution.checkpointing.externalized-checkpoint-retention: > RETAIN_ON_CANCELLATION > pipeline.jars: local:///build/flink/usrlib/.jar > state.checkpoints.dir: s3:///checkpoints > > At the time of the issue, here is our FlinkDeployment Spec: > > Spec: > Flink Configuration: > execution.checkpointing.timeout: 5 min > kubernetes.operator.job.restart.failed: true > kubernetes.operator.periodic.savepoint.interval: 600s > metrics.reporter.prom.class: > org.apache.flink.metrics.prometheus.PrometheusReporter > metrics.reporter.prom.port: 9090 > metrics.reporters:prom > metrics.scope.jm: > flink.jobmanager.metric > metrics.scope.jm.job: > flink.jobmanager.job..metric > metrics.scope.operator: > flink.taskmanager.job..operator..metric > metrics.scope.task: > flink.taskmanager.job..task..metric > metrics.scope.tm: > flink.taskmanager.metric > metrics.scope.tm.job: > flink.taskmanager.job..metric > pipeline.auto-generate-uids:
Issue with flink-kubernetes-operator not updating execution.savepoint.path after savepoint deletion
Hi flink-kubernetes-operator maintainers, We have recently migrated to the official operator and seeing a new issue where our FlinkDeployments can fail and crashloop looking for a non-existent savepoint. On further inspection, the job is attempting to restart from the savepoint specified in execution.savepoint.path. This config new for us (wasn't set by previous operator) is seems to be automatically set behind the scenes by the official operator. We see the savepoint in execution.savepoint.path existed but gets deleted after some amount of time (in the latest example, a few hours). Then when there is some pod disruption, the job attempts to restart from the savepoint (which was deleted) and starts crashlooping. Hoping you can help us troubleshoot and figure out if this can be solved through configuration (we are using equivalent configs from our previous operator where we did not have this issue). Adding some details on version and k8s state for your reference. Thank you for your support! Flink Version: 1.14.5 Flink Operator Version: 1.4.0 At the time of the issue, here is the flink-config we see in the configmap (the savepoint savepoint-bad5e5-6ab08cf0808e has been deleted from s3 at this point): kubernetes.jobmanager.replicas: 1 jobmanager.rpc.address: metrics.scope.task: flink.taskmanager.job..task..metric kubernetes.service-account: kubernetes.cluster-id: pipeline.auto-generate-uids: false metrics.scope.tm: flink.taskmanager.metric parallelism.default: 2 kubernetes.namespace: metrics.reporters: prom kubernetes.jobmanager.owner.reference: metrics.reporter.prom.port: 9090 taskmanager.memory.process.size: 10G kubernetes.internal.jobmanager.entrypoint.class: org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint pipeline.name: execution.savepoint.path: s3:///savepoint-bad5e5-6ab08cf0808e kubernetes.pod-template-file: /tmp/flink_op_generated_podTemplate_12924532349572558288.yaml state.backend.rocksdb.localdir: /rocksdb/ kubernetes.pod-template-file.taskmanager: /tmp/flink_op_generated_podTemplate_1129545383743356980.yaml web.cancel.enable: false execution.checkpointing.timeout: 5 min kubernetes.container.image.pull-policy: IfNotPresent $internal.pipeline.job-id: bad5e5682b8f4fbefbf75b00d285ac10 kubernetes.jobmanager.cpu: 2.0 state.backend: filesystem $internal.flink.version: v1_14 kubernetes.pod-template-file.jobmanager: /tmp/flink_op_generated_podTemplate_824610597202468981.yaml blob.server.port: 6124 kubernetes.jobmanager.annotations: flinkdeployment.flink.apache.org/generation:14 metrics.scope.operator: flink.taskmanager.job..operator..metric state.savepoints.dir: s3:///savepoints kubernetes.taskmanager.cpu: 2.0 execution.savepoint.ignore-unclaimed-state: true $internal.application.program-args: kubernetes.container.image: taskmanager.numberOfTaskSlots: 1 metrics.scope.jm.job: flink.jobmanager.job..metric kubernetes.rest-service.exposed.type: ClusterIP metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter $internal.application.main: metrics.scope.jm: flink.jobmanager.metric execution.target: kubernetes-application jobmanager.memory.process.size: 10G metrics.scope.tm.job: flink.taskmanager.job..metric taskmanager.rpc.port: 6122 internal.cluster.execution-mode: NORMAL execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION pipeline.jars: local:///build/flink/usrlib/.jar state.checkpoints.dir: s3:///checkpoints At the time of the issue, here is our FlinkDeployment Spec: Spec: Flink Configuration: execution.checkpointing.timeout: 5 min kubernetes.operator.job.restart.failed: true kubernetes.operator.periodic.savepoint.interval: 600s metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter metrics.reporter.prom.port: 9090 metrics.reporters:prom metrics.scope.jm: flink.jobmanager.metric metrics.scope.jm.job: flink.jobmanager.job..metric metrics.scope.operator: flink.taskmanager.job..operator..metric metrics.scope.task: flink.taskmanager.job..task..metric metrics.scope.tm: flink.taskmanager.metric metrics.scope.tm.job: flink.taskmanager.job..metric pipeline.auto-generate-uids: false pipeline.name: state.backend:filesystem state.backend.rocksdb.localdir: /rocksdb/ state.checkpoints.dir: s3:///checkpoints state.savepoints.dir: s3:///savepoints Flink Version: v1_14 Image: Image Pull Policy: IfNotPresent Job: Allow Non Restored State: true Args: Entry Class: Initial Savepoint Path: s3a:///savepoint-bad5e5-577c6a76aec5 Jar URI: local:///build/flink/usrlib/.jar