Re: Issue with flink-kubernetes-operator not updating execution.savepoint.path after savepoint deletion

2023-10-21 Thread Gyula Fóra
Hi Tony,

It doesn’t seem like the operator had too much to do with this error , I
wonder if this would still happen in newer Flink versions with the
JobResultStore already available.

It would be great to try. In any case I highly recommend upgrading to newer
Flink versions for better operator integration and general stability.

The next operator release (1.7.0) will drop support for Flink 1.13 and 1.14
as agreed by the community to only support the last 4 stable Flink minor
versions .

Cheers
Gyula

On Sat, 21 Oct 2023 at 20:49, Tony Chen  wrote:

> Hi Gyula,
>
> After upgrading our operator version to the HEAD commit of the release-1.6
> branch (
> https://github.com/apache/flink-kubernetes-operator/pkgs/container/flink-kubernetes-operator/127962962?tag=3f0dc2e),
> we are still seeing this same issue.
>
> Here's the log message on the last savepoint (log timestamp is in UTC):
>
> 2023-10-21 10:21:14,023 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
>> Completed checkpoint 87794 for job ee4f7c678794ee16506f9b41425c244e
>> (698450687 bytes, checkpointDuration=5601 ms, finalizationTime=296 ms).
>
>
> 4 minutes later, ConnectException occurred, and the jobmanager attempts to
> restart from the last savepoint first:
>
> 2023-10-21 10:25:30,725 WARN  akka.remote.transport.netty.NettyTransport
>> [] - Remote connection to [null] failed with
>> java.net.ConnectException: Connection refused: /10.11.181.62:6122
>> 2023-10-21 10:25:30,726 WARN  akka.remote.ReliableDeliverySupervisor
>>   [] - Association with remote system [akka.tcp://
>> flink@10.11.181.62:6122] has failed, address is now gated for [50] ms.
>> Reason: [Association failed with [akka.tcp://flink@10.11.181.62:6122]]
>> Caused by: [java.net.ConnectException: Connection refused: /
>> 10.11.181.62:6122]
>> 2023-10-21 10:25:37,935 WARN
>>  org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
>> hostname could be resolved for the IP address 10.11.202.152, using IP
>> address as host name. Local input split assignment (such as for HDFS files)
>> may be impacted.
>> 2023-10-21 10:25:37,936 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job
>>  (ee4f7c678794ee16506f9b41425c244e) switched from state
>> RESTARTING to RUNNING.
>> 2023-10-21 10:25:37,936 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
>> Restoring job ee4f7c678794ee16506f9b41425c244e from Savepoint 87794 @
>> 1697883668126 for ee4f7c678794ee16506f9b41425c244e located at
>> s3:///savepoint-ee4f7c-9c6499126fd0.
>> 2023-10-21 10:25:37,937 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - No
>> master state to restore
>
>
> However, a RecipientUnreachableException occurs, and the HA data gets
> cleaned up. Eventually, the Flink cluster shuts down and restarts:
>
>
>> org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException:
>> Could not send message
>> [RemoteRpcInvocation(null.submitTask(TaskDeploymentDescriptor, JobMasterId,
>> Time))] from sender [Actor[akka://flink/temp/taskmanager_0$ENE]] to
>> recipient [Actor[akka.tcp://
>> flink@10.11.181.62:6122/user/rpc/taskmanager_0#-43671188]], because the
>> recipient is unreachable. This can either mean that the recipient has been
>> terminated or that the remote RpcService is currently not reachable.
>> at
>> org.apache.flink.runtime.rpc.akka.DeadLettersActor.handleDeadLetter(DeadLettersActor.java:61)
>> ~[flink-rpc-akka_61fdae14-7548-48be-b7c8-11190d636910.jar:1.14.5]
>> ...
>> 2023-10-21 10:25:37,946 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
>> Discarding the results produced by task execution
>> 86d39b748d3655b6488fb9eaafb34f73.
>> ...
>> 2023-10-21 10:25:40,063 INFO
>>  org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
>> Finished cleaning up the high availability data.
>> ...
>> 2023-10-21 10:25:40,170 INFO
>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>> Terminating cluster entrypoint process
>> KubernetesApplicationClusterEntrypoint with exit code 1443.
>> ...
>> 2023-10-21 10:25:44,631 INFO
>>  org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] -
>> Recovered 2 pods from previous attempts, current attempt id is 2.
>> ...
>> 2023-10-21 10:25:44,631 INFO
>>  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
>> Recovered 2 workers from previous attempt.
>> ...
>> 2023-10-21 10:25:45,015 ERROR
>> org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler [] -
>> Unhandled exception.
>> org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException:
>> Could not send message
>> [RemoteFencedMessage(b55fb309bb698aa75925f70bce254756,
>> RemoteRpcInvocation(null.requestMultipleJobDetails(Time)))] from sender
>> [Actor[akka.tcp://flink@10.11.76.167:6123/temp/dispatcher_0$Tb]] to
>> recipient [Actor[akka://flink/user/rpc/dispatcher_0#1755511719]], 

Re: Issue with flink-kubernetes-operator not updating execution.savepoint.path after savepoint deletion

2023-10-21 Thread Tony Chen
Hi Gyula,

After upgrading our operator version to the HEAD commit of the release-1.6
branch (
https://github.com/apache/flink-kubernetes-operator/pkgs/container/flink-kubernetes-operator/127962962?tag=3f0dc2e),
we are still seeing this same issue.

Here's the log message on the last savepoint (log timestamp is in UTC):

2023-10-21 10:21:14,023 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Completed checkpoint 87794 for job ee4f7c678794ee16506f9b41425c244e
> (698450687 bytes, checkpointDuration=5601 ms, finalizationTime=296 ms).


4 minutes later, ConnectException occurred, and the jobmanager attempts to
restart from the last savepoint first:

2023-10-21 10:25:30,725 WARN  akka.remote.transport.netty.NettyTransport
> [] - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused: /10.11.181.62:6122
> 2023-10-21 10:25:30,726 WARN  akka.remote.ReliableDeliverySupervisor
> [] - Association with remote system [akka.tcp://
> flink@10.11.181.62:6122] has failed, address is now gated for [50] ms.
> Reason: [Association failed with [akka.tcp://flink@10.11.181.62:6122]]
> Caused by: [java.net.ConnectException: Connection refused: /
> 10.11.181.62:6122]
> 2023-10-21 10:25:37,935 WARN
>  org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No
> hostname could be resolved for the IP address 10.11.202.152, using IP
> address as host name. Local input split assignment (such as for HDFS files)
> may be impacted.
> 2023-10-21 10:25:37,936 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job
>  (ee4f7c678794ee16506f9b41425c244e) switched from state
> RESTARTING to RUNNING.
> 2023-10-21 10:25:37,936 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Restoring job ee4f7c678794ee16506f9b41425c244e from Savepoint 87794 @
> 1697883668126 for ee4f7c678794ee16506f9b41425c244e located at
> s3:///savepoint-ee4f7c-9c6499126fd0.
> 2023-10-21 10:25:37,937 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - No
> master state to restore


However, a RecipientUnreachableException occurs, and the HA data gets
cleaned up. Eventually, the Flink cluster shuts down and restarts:


> org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException:
> Could not send message
> [RemoteRpcInvocation(null.submitTask(TaskDeploymentDescriptor, JobMasterId,
> Time))] from sender [Actor[akka://flink/temp/taskmanager_0$ENE]] to
> recipient [Actor[akka.tcp://
> flink@10.11.181.62:6122/user/rpc/taskmanager_0#-43671188]], because the
> recipient is unreachable. This can either mean that the recipient has been
> terminated or that the remote RpcService is currently not reachable.
> at
> org.apache.flink.runtime.rpc.akka.DeadLettersActor.handleDeadLetter(DeadLettersActor.java:61)
> ~[flink-rpc-akka_61fdae14-7548-48be-b7c8-11190d636910.jar:1.14.5]
> ...
> 2023-10-21 10:25:37,946 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
> Discarding the results produced by task execution
> 86d39b748d3655b6488fb9eaafb34f73.
> ...
> 2023-10-21 10:25:40,063 INFO
>  org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
> Finished cleaning up the high availability data.
> ...
> 2023-10-21 10:25:40,170 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> Terminating cluster entrypoint process
> KubernetesApplicationClusterEntrypoint with exit code 1443.
> ...
> 2023-10-21 10:25:44,631 INFO
>  org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] -
> Recovered 2 pods from previous attempts, current attempt id is 2.
> ...
> 2023-10-21 10:25:44,631 INFO
>  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Recovered 2 workers from previous attempt.
> ...
> 2023-10-21 10:25:45,015 ERROR
> org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler [] -
> Unhandled exception.
> org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException:
> Could not send message
> [RemoteFencedMessage(b55fb309bb698aa75925f70bce254756,
> RemoteRpcInvocation(null.requestMultipleJobDetails(Time)))] from sender
> [Actor[akka.tcp://flink@10.11.76.167:6123/temp/dispatcher_0$Tb]] to
> recipient [Actor[akka://flink/user/rpc/dispatcher_0#1755511719]], because
> the recipient is unreachable. This can either mean that the recipient has
> been terminated or that the remote RpcService is currently not reachable.
> ...
> 2023-10-21 10:25:45,798 INFO
>  org.apache.flink.runtime.blob.FileSystemBlobStore[] - Creating
> highly available BLOB storage directory at
> s3:blob


When the Flink cluster restarts, it doesn't try to restore from the latest
savepoint anymore. Instead, it tries to restore from a savepoint in
`execution.savepoint.path` in the flink-config. Since this savepoint was
from a while ago, it has been disposed already, and so the Flink cluster
cannot restart again:

2023-10-21 

Re: Issue with flink-kubernetes-operator not updating execution.savepoint.path after savepoint deletion

2023-09-22 Thread Gyula Fóra
Hi

Operator savepoint retention and savepoint upgrades have nothing to do with
each other I think. Retention is only for periodic savepoints triggered by
the operator itself.

I would upgrade to the latest 1.6.0 operator version before investigating
further.

Cheers
Gyula


On Sat, 23 Sep 2023 at 06:02, Nathan Moderwell <
nathan.moderw...@robinhood.com> wrote:

> Small update on this. I see that the issue is that we use `upgradeMode:
> savepoint`, but have not configured the operator to retain savepoints for
> long enough (the previous operator we used never deleted savepoints so we
> didn't run into this). I am reconfiguring to use `upgradeMode: last-state`
> and enabling HA to see if this provides us more stable job restoration on
> pod disruption.
>
> On Fri, Sep 22, 2023 at 10:20 AM Nathan Moderwell <
> nathan.moderw...@robinhood.com> wrote:
>
>> Hi flink-kubernetes-operator maintainers,
>>
>> We have recently migrated to the official operator and seeing a new issue
>> where our FlinkDeployments can fail and crashloop looking for a
>> non-existent savepoint. On further inspection, the job is attempting to
>> restart from the savepoint specified in execution.savepoint.path. This
>> config new for us (wasn't set by previous operator) is seems to be
>> automatically set behind the scenes by the official operator. We see the
>> savepoint in execution.savepoint.path existed but gets deleted after some
>> amount of time (in the latest example, a few hours). Then when there is
>> some pod disruption, the job attempts to restart from the savepoint (which
>> was deleted) and starts crashlooping.
>>
>> Hoping you can help us troubleshoot and figure out if this can be solved
>> through configuration (we are using equivalent configs from our previous
>> operator where we did not have this issue). Adding some details on version
>> and k8s state for your reference. Thank you for your support!
>>
>> Flink Version: 1.14.5
>> Flink Operator Version: 1.4.0
>>
>> At the time of the issue, here is the flink-config we see in the
>> configmap (the savepoint savepoint-bad5e5-6ab08cf0808e has been deleted
>> from s3 at this point):
>>
>> kubernetes.jobmanager.replicas: 1
>> jobmanager.rpc.address: 
>> metrics.scope.task:
>> flink.taskmanager.job..task..metric
>> kubernetes.service-account: 
>> kubernetes.cluster-id: 
>> pipeline.auto-generate-uids: false
>> metrics.scope.tm: flink.taskmanager.metric
>> parallelism.default: 2
>> kubernetes.namespace: 
>> metrics.reporters: prom
>> kubernetes.jobmanager.owner.reference: 
>> metrics.reporter.prom.port: 9090
>> taskmanager.memory.process.size: 10G
>> kubernetes.internal.jobmanager.entrypoint.class:
>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
>> pipeline.name: 
>> execution.savepoint.path: s3:///savepoint-bad5e5-6ab08cf0808e
>> kubernetes.pod-template-file:
>> /tmp/flink_op_generated_podTemplate_12924532349572558288.yaml
>> state.backend.rocksdb.localdir: /rocksdb/
>> kubernetes.pod-template-file.taskmanager:
>> /tmp/flink_op_generated_podTemplate_1129545383743356980.yaml
>> web.cancel.enable: false
>> execution.checkpointing.timeout: 5 min
>> kubernetes.container.image.pull-policy: IfNotPresent
>> $internal.pipeline.job-id: bad5e5682b8f4fbefbf75b00d285ac10
>> kubernetes.jobmanager.cpu: 2.0
>> state.backend: filesystem
>> $internal.flink.version: v1_14
>> kubernetes.pod-template-file.jobmanager:
>> /tmp/flink_op_generated_podTemplate_824610597202468981.yaml
>> blob.server.port: 6124
>> kubernetes.jobmanager.annotations:
>> flinkdeployment.flink.apache.org/generation:14
>> metrics.scope.operator:
>> flink.taskmanager.job..operator..metric
>> state.savepoints.dir: s3:///savepoints
>> kubernetes.taskmanager.cpu: 2.0
>> execution.savepoint.ignore-unclaimed-state: true
>> $internal.application.program-args:
>> kubernetes.container.image: 
>> taskmanager.numberOfTaskSlots: 1
>> metrics.scope.jm.job: flink.jobmanager.job..metric
>> kubernetes.rest-service.exposed.type: ClusterIP
>> metrics.reporter.prom.class:
>> org.apache.flink.metrics.prometheus.PrometheusReporter
>> $internal.application.main: 
>> metrics.scope.jm: flink.jobmanager.metric
>> execution.target: kubernetes-application
>> jobmanager.memory.process.size: 10G
>> metrics.scope.tm.job: flink.taskmanager.job..metric
>> taskmanager.rpc.port: 6122
>> internal.cluster.execution-mode: NORMAL
>> execution.checkpointing.externalized-checkpoint-retention:
>> RETAIN_ON_CANCELLATION
>> pipeline.jars: local:///build/flink/usrlib/.jar
>> state.checkpoints.dir: s3:///checkpoints
>>
>> At the time of the issue, here is our FlinkDeployment Spec:
>>
>> Spec:
>>   Flink Configuration:
>> execution.checkpointing.timeout:  5 min
>> kubernetes.operator.job.restart.failed:   true
>> kubernetes.operator.periodic.savepoint.interval:  600s
>> metrics.reporter.prom.class:
>>  org.apache.flink.metrics.prometheus.PrometheusReporter
>> metrics.reporter.prom.port: 

Re: Issue with flink-kubernetes-operator not updating execution.savepoint.path after savepoint deletion

2023-09-22 Thread Nathan Moderwell
Small update on this. I see that the issue is that we use `upgradeMode:
savepoint`, but have not configured the operator to retain savepoints for
long enough (the previous operator we used never deleted savepoints so we
didn't run into this). I am reconfiguring to use `upgradeMode: last-state`
and enabling HA to see if this provides us more stable job restoration on
pod disruption.

On Fri, Sep 22, 2023 at 10:20 AM Nathan Moderwell <
nathan.moderw...@robinhood.com> wrote:

> Hi flink-kubernetes-operator maintainers,
>
> We have recently migrated to the official operator and seeing a new issue
> where our FlinkDeployments can fail and crashloop looking for a
> non-existent savepoint. On further inspection, the job is attempting to
> restart from the savepoint specified in execution.savepoint.path. This
> config new for us (wasn't set by previous operator) is seems to be
> automatically set behind the scenes by the official operator. We see the
> savepoint in execution.savepoint.path existed but gets deleted after some
> amount of time (in the latest example, a few hours). Then when there is
> some pod disruption, the job attempts to restart from the savepoint (which
> was deleted) and starts crashlooping.
>
> Hoping you can help us troubleshoot and figure out if this can be solved
> through configuration (we are using equivalent configs from our previous
> operator where we did not have this issue). Adding some details on version
> and k8s state for your reference. Thank you for your support!
>
> Flink Version: 1.14.5
> Flink Operator Version: 1.4.0
>
> At the time of the issue, here is the flink-config we see in the configmap
> (the savepoint savepoint-bad5e5-6ab08cf0808e has been deleted from s3 at
> this point):
>
> kubernetes.jobmanager.replicas: 1
> jobmanager.rpc.address: 
> metrics.scope.task:
> flink.taskmanager.job..task..metric
> kubernetes.service-account: 
> kubernetes.cluster-id: 
> pipeline.auto-generate-uids: false
> metrics.scope.tm: flink.taskmanager.metric
> parallelism.default: 2
> kubernetes.namespace: 
> metrics.reporters: prom
> kubernetes.jobmanager.owner.reference: 
> metrics.reporter.prom.port: 9090
> taskmanager.memory.process.size: 10G
> kubernetes.internal.jobmanager.entrypoint.class:
> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
> pipeline.name: 
> execution.savepoint.path: s3:///savepoint-bad5e5-6ab08cf0808e
> kubernetes.pod-template-file:
> /tmp/flink_op_generated_podTemplate_12924532349572558288.yaml
> state.backend.rocksdb.localdir: /rocksdb/
> kubernetes.pod-template-file.taskmanager:
> /tmp/flink_op_generated_podTemplate_1129545383743356980.yaml
> web.cancel.enable: false
> execution.checkpointing.timeout: 5 min
> kubernetes.container.image.pull-policy: IfNotPresent
> $internal.pipeline.job-id: bad5e5682b8f4fbefbf75b00d285ac10
> kubernetes.jobmanager.cpu: 2.0
> state.backend: filesystem
> $internal.flink.version: v1_14
> kubernetes.pod-template-file.jobmanager:
> /tmp/flink_op_generated_podTemplate_824610597202468981.yaml
> blob.server.port: 6124
> kubernetes.jobmanager.annotations:
> flinkdeployment.flink.apache.org/generation:14
> metrics.scope.operator:
> flink.taskmanager.job..operator..metric
> state.savepoints.dir: s3:///savepoints
> kubernetes.taskmanager.cpu: 2.0
> execution.savepoint.ignore-unclaimed-state: true
> $internal.application.program-args:
> kubernetes.container.image: 
> taskmanager.numberOfTaskSlots: 1
> metrics.scope.jm.job: flink.jobmanager.job..metric
> kubernetes.rest-service.exposed.type: ClusterIP
> metrics.reporter.prom.class:
> org.apache.flink.metrics.prometheus.PrometheusReporter
> $internal.application.main: 
> metrics.scope.jm: flink.jobmanager.metric
> execution.target: kubernetes-application
> jobmanager.memory.process.size: 10G
> metrics.scope.tm.job: flink.taskmanager.job..metric
> taskmanager.rpc.port: 6122
> internal.cluster.execution-mode: NORMAL
> execution.checkpointing.externalized-checkpoint-retention:
> RETAIN_ON_CANCELLATION
> pipeline.jars: local:///build/flink/usrlib/.jar
> state.checkpoints.dir: s3:///checkpoints
>
> At the time of the issue, here is our FlinkDeployment Spec:
>
> Spec:
>   Flink Configuration:
> execution.checkpointing.timeout:  5 min
> kubernetes.operator.job.restart.failed:   true
> kubernetes.operator.periodic.savepoint.interval:  600s
> metrics.reporter.prom.class:
>  org.apache.flink.metrics.prometheus.PrometheusReporter
> metrics.reporter.prom.port:   9090
> metrics.reporters:prom
> metrics.scope.jm:
> flink.jobmanager.metric
> metrics.scope.jm.job:
> flink.jobmanager.job..metric
> metrics.scope.operator:
> flink.taskmanager.job..operator..metric
> metrics.scope.task:
> flink.taskmanager.job..task..metric
> metrics.scope.tm:
> flink.taskmanager.metric
> metrics.scope.tm.job:
> flink.taskmanager.job..metric
> pipeline.auto-generate-uids:   

Issue with flink-kubernetes-operator not updating execution.savepoint.path after savepoint deletion

2023-09-22 Thread Nathan Moderwell
Hi flink-kubernetes-operator maintainers,

We have recently migrated to the official operator and seeing a new issue
where our FlinkDeployments can fail and crashloop looking for a
non-existent savepoint. On further inspection, the job is attempting to
restart from the savepoint specified in execution.savepoint.path. This
config new for us (wasn't set by previous operator) is seems to be
automatically set behind the scenes by the official operator. We see the
savepoint in execution.savepoint.path existed but gets deleted after some
amount of time (in the latest example, a few hours). Then when there is
some pod disruption, the job attempts to restart from the savepoint (which
was deleted) and starts crashlooping.

Hoping you can help us troubleshoot and figure out if this can be solved
through configuration (we are using equivalent configs from our previous
operator where we did not have this issue). Adding some details on version
and k8s state for your reference. Thank you for your support!

Flink Version: 1.14.5
Flink Operator Version: 1.4.0

At the time of the issue, here is the flink-config we see in the configmap
(the savepoint savepoint-bad5e5-6ab08cf0808e has been deleted from s3 at
this point):

kubernetes.jobmanager.replicas: 1
jobmanager.rpc.address: 
metrics.scope.task: flink.taskmanager.job..task..metric
kubernetes.service-account: 
kubernetes.cluster-id: 
pipeline.auto-generate-uids: false
metrics.scope.tm: flink.taskmanager.metric
parallelism.default: 2
kubernetes.namespace: 
metrics.reporters: prom
kubernetes.jobmanager.owner.reference: 
metrics.reporter.prom.port: 9090
taskmanager.memory.process.size: 10G
kubernetes.internal.jobmanager.entrypoint.class:
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
pipeline.name: 
execution.savepoint.path: s3:///savepoint-bad5e5-6ab08cf0808e
kubernetes.pod-template-file:
/tmp/flink_op_generated_podTemplate_12924532349572558288.yaml
state.backend.rocksdb.localdir: /rocksdb/
kubernetes.pod-template-file.taskmanager:
/tmp/flink_op_generated_podTemplate_1129545383743356980.yaml
web.cancel.enable: false
execution.checkpointing.timeout: 5 min
kubernetes.container.image.pull-policy: IfNotPresent
$internal.pipeline.job-id: bad5e5682b8f4fbefbf75b00d285ac10
kubernetes.jobmanager.cpu: 2.0
state.backend: filesystem
$internal.flink.version: v1_14
kubernetes.pod-template-file.jobmanager:
/tmp/flink_op_generated_podTemplate_824610597202468981.yaml
blob.server.port: 6124
kubernetes.jobmanager.annotations:
flinkdeployment.flink.apache.org/generation:14
metrics.scope.operator:
flink.taskmanager.job..operator..metric
state.savepoints.dir: s3:///savepoints
kubernetes.taskmanager.cpu: 2.0
execution.savepoint.ignore-unclaimed-state: true
$internal.application.program-args:
kubernetes.container.image: 
taskmanager.numberOfTaskSlots: 1
metrics.scope.jm.job: flink.jobmanager.job..metric
kubernetes.rest-service.exposed.type: ClusterIP
metrics.reporter.prom.class:
org.apache.flink.metrics.prometheus.PrometheusReporter
$internal.application.main: 
metrics.scope.jm: flink.jobmanager.metric
execution.target: kubernetes-application
jobmanager.memory.process.size: 10G
metrics.scope.tm.job: flink.taskmanager.job..metric
taskmanager.rpc.port: 6122
internal.cluster.execution-mode: NORMAL
execution.checkpointing.externalized-checkpoint-retention:
RETAIN_ON_CANCELLATION
pipeline.jars: local:///build/flink/usrlib/.jar
state.checkpoints.dir: s3:///checkpoints

At the time of the issue, here is our FlinkDeployment Spec:

Spec:
  Flink Configuration:
execution.checkpointing.timeout:  5 min
kubernetes.operator.job.restart.failed:   true
kubernetes.operator.periodic.savepoint.interval:  600s
metrics.reporter.prom.class:
 org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port:   9090
metrics.reporters:prom
metrics.scope.jm:
flink.jobmanager.metric
metrics.scope.jm.job:
flink.jobmanager.job..metric
metrics.scope.operator:
flink.taskmanager.job..operator..metric
metrics.scope.task:
flink.taskmanager.job..task..metric
metrics.scope.tm:
flink.taskmanager.metric
metrics.scope.tm.job:
flink.taskmanager.job..metric
pipeline.auto-generate-uids:  false
pipeline.name:
state.backend:filesystem
state.backend.rocksdb.localdir:   /rocksdb/
state.checkpoints.dir:
 s3:///checkpoints
state.savepoints.dir:
s3:///savepoints
  Flink Version:  v1_14
  Image:  
  Image Pull Policy:  IfNotPresent
  Job:
Allow Non Restored State:  true
Args:
Entry Class: 
Initial Savepoint Path:  s3a:///savepoint-bad5e5-577c6a76aec5
Jar URI: local:///build/flink/usrlib/.jar