Re: [External] Exception during autoscaling operation - Flink 1.18/Operator 1.8.0

2024-04-26 Thread Gyula Fóra
I was talking about Flink Kubernetes operator and HA , not the individual
Flink jobs. But based on your answer it’s probably not the cause

Gyula

On Fri, 26 Apr 2024 at 21:15, Maxim Senin  wrote:

> Hi, Gyula. Thanks for the tips.
>
> All jobs are deployed in a single namespace, “flink”.
>
> Which replicas? The JM replicas are already 1, I tried with TM replicas
> set to 1, but same exception happens. We have only 1 instance of the
> operator (replicas=1) in this environment.
>
> The only workarounds I discovered is either
> a) disable autoscaling for the failing job (autoscaler scales the job to
> zero for “gracefully” stopping it and then never starts it)  or
> b) some jobs that keep restarting can be fixed by disabling HA for that job
>
> And ` *Cannot rescale the given pointwise partitioner.` *is also still a
> mystery.
>
> *Thanks,*
>
> *Maxim*
>
>
>
> *From: *Gyula Fóra 
> *Date: *Friday, April 26, 2024 at 1:10 AM
> *To: *Maxim Senin 
> *Cc: *Maxim Senin via user 
> *Subject: *Re: [External] Exception during autoscaling operation - Flink
> 1.18/Operator 1.8.0
>
> Hi Maxim!
>
>
>
> Regarding the status update error, it could be related to a problem that
> we have discovered recently with the Flink Operator HA. Where during a
> namespace change both leader and follower instances would start processing.
>
> It has been fixed in the current master by updating the JOSDK version to
> the one containing the fix.
>
>
>
> For details you can check:
>
> https://github.com/operator-framework/java-operator-sdk/issues/2341
>
>
> https://github.com/apache/flink-kubernetes-operator/commit/29a4aa5adf101920cbe1a3a9a178ff16f52c746d
>
>
>
> To resolve the issue (if it's caused by this), you could either
> cherry-pick the fix internally to the operator or reduce the replicas to 1
> if you are using HA.
>
>
>
> Cheers,
>
> Gyula
>
>
>
>
>
>
>
> On Fri, Apr 26, 2024 at 3:48 AM Maxim Senin via user <
> user@flink.apache.org> wrote:
>
> I have also seen this exception:
>
> o.a.f.k.o.o.JobStatusObserver  
> *[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6]
> Job d0ac9da5959d8cc9a82645eeef6751a5 failed with error:
> java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException:
> java.lang.UnsupportedOperationException: Cannot rescale the given pointwise
> partitioner.*
>
> *Did you change the partitioner to forward or rescale?*
>
> *It may also help to add an explicit shuffle().*
>
> *at
> org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$23(FutureUtils.java:1305)*
>
> *at
> java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)*
>
> *at
> java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown
> Source)*
>
> *at
> java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown
> Source)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)*
>
> *at
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)*
>
> *at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)*
>
> *at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)*
>
> *at
> scala.PartialFunction.applyOrElse(PartialFunction.scala:127)*
>
> *at
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)*
>
> *at
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)*
>
> I can’t find any information on how to interpret this. Please advise..
>
> Cheers,
> Maxim
>
> *From: *Maxim Senin via user 
> *Date: *Thursday, April 25, 2024 at 12:01 PM
> *To: *Maxim Senin via user 
> *Subject: *[External] Exception during autoscaling operation - Flink
> 1.18/Operator 1.8.0
>
> Hi.
>
> I already asked before but never got an answer. My observation is that the
> operator, after collecting some stats, is trying to restart one of 

Re: [External] Exception during autoscaling operation - Flink 1.18/Operator 1.8.0

2024-04-26 Thread Maxim Senin via user
c093-5d8a-b5f5-2b66b4547bf6] Deleting cluster with 
Foreground propagation
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Scaling JobManager 
Deployment to zero with 300 seconds timeout...
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Completed Scaling 
JobManager Deployment to zero
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Deleting JobManager 
Deployment with 298 seconds timeout...
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Completed Deleting 
JobManager Deployment
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Deleting Kubernetes HA 
metadata

Any ideas?

Thanks,
Maxim

From: Gyula Fóra 
Date: Friday, April 26, 2024 at 1:10 AM
To: Maxim Senin 
Cc: Maxim Senin via user 
Subject: Re: [External] Exception during autoscaling operation - Flink 
1.18/Operator 1.8.0
Hi Maxim!

Regarding the status update error, it could be related to a problem that we 
have discovered recently with the Flink Operator HA. Where during a namespace 
change both leader and follower instances would start processing.
It has been fixed in the current master by updating the JOSDK version to the 
one containing the fix.

For details you can check:
https://github.com/operator-framework/java-operator-sdk/issues/2341
https://github.com/apache/flink-kubernetes-operator/commit/29a4aa5adf101920cbe1a3a9a178ff16f52c746d

To resolve the issue (if it's caused by this), you could either cherry-pick the 
fix internally to the operator or reduce the replicas to 1 if you are using HA.

Cheers,
Gyula



On Fri, Apr 26, 2024 at 3:48 AM Maxim Senin via user 
mailto:user@flink.apache.org>> wrote:
I have also seen this exception:

o.a.f.k.o.o.JobStatusObserver  
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Job 
d0ac9da5959d8cc9a82645eeef6751a5 failed with error: 
java.util.concurrent.CompletionException: 
java.util.concurrent.CompletionException: 
java.lang.UnsupportedOperationException: Cannot rescale the given pointwise 
partitioner.
Did you change the partitioner to forward or rescale?
It may also help to add an explicit shuffle().
at 
org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$23(FutureUtils.java:1305)
at 
java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
at 
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown 
Source)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)

I can’t find any information on how to interpret this. Please advise..

Cheers,
Maxim
From: Maxim Senin via user mailto:user@flink.apache.org>>
Date: Thursday, April 25, 2024 at 12:01 PM
To: Maxim Senin via user mailto:user@flink.apache.org>>
Subject: [External] Exception during autoscaling operation - Flink 
1.18/Operator 1.8.0
Hi.

I already asked before but never got an answer. My observation is that the 
operator, after collecting some stats, is trying to restart one of the 
deployments. This includes taking a savepoint (`takeSavepointOnUpgrade: true`, 
`upgradeMode: savepoint`) and “gracefully” shutting down the JobManager by 
“scaling it to zero” (by setting replicas = 0 in the new generated config).

However, the deployment never comes back up, apparently, due to exception:

2024-04-25 17:20:52,920 mi.j.o.p.e.ReconciliationDispatcher 
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Error during error status 
handling.
org.apache.flink.kubernetes.operator.exception.StatusConflictException: Status 
have been modified externally in version 50607043 Previous: 
{"jobStatus":{"jobName":"autoscaling 
test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHEDINFO\\n\"},\"mode\":\"native\"},\"resource_metadata\&

Re: [External] Exception during autoscaling operation - Flink 1.18/Operator 1.8.0

2024-04-26 Thread Maxim Senin via user
Hi, Gyula. Thanks for the tips.

All jobs are deployed in a single namespace, “flink”.

Which replicas? The JM replicas are already 1, I tried with TM replicas set to 
1, but same exception happens. We have only 1 instance of the operator 
(replicas=1) in this environment.

The only workarounds I discovered is either
a) disable autoscaling for the failing job (autoscaler scales the job to zero 
for “gracefully” stopping it and then never starts it)  or
b) some jobs that keep restarting can be fixed by disabling HA for that job

And ` Cannot rescale the given pointwise partitioner.` is also still a mystery.
Thanks,
Maxim

From: Gyula Fóra 
Date: Friday, April 26, 2024 at 1:10 AM
To: Maxim Senin 
Cc: Maxim Senin via user 
Subject: Re: [External] Exception during autoscaling operation - Flink 
1.18/Operator 1.8.0
Hi Maxim!

Regarding the status update error, it could be related to a problem that we 
have discovered recently with the Flink Operator HA. Where during a namespace 
change both leader and follower instances would start processing.
It has been fixed in the current master by updating the JOSDK version to the 
one containing the fix.

For details you can check:
https://github.com/operator-framework/java-operator-sdk/issues/2341
https://github.com/apache/flink-kubernetes-operator/commit/29a4aa5adf101920cbe1a3a9a178ff16f52c746d

To resolve the issue (if it's caused by this), you could either cherry-pick the 
fix internally to the operator or reduce the replicas to 1 if you are using HA.

Cheers,
Gyula



On Fri, Apr 26, 2024 at 3:48 AM Maxim Senin via user 
mailto:user@flink.apache.org>> wrote:
I have also seen this exception:

o.a.f.k.o.o.JobStatusObserver  
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Job 
d0ac9da5959d8cc9a82645eeef6751a5 failed with error: 
java.util.concurrent.CompletionException: 
java.util.concurrent.CompletionException: 
java.lang.UnsupportedOperationException: Cannot rescale the given pointwise 
partitioner.
Did you change the partitioner to forward or rescale?
It may also help to add an explicit shuffle().
at 
org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$23(FutureUtils.java:1305)
at 
java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
at 
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown 
Source)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)

I can’t find any information on how to interpret this. Please advise..

Cheers,
Maxim
From: Maxim Senin via user mailto:user@flink.apache.org>>
Date: Thursday, April 25, 2024 at 12:01 PM
To: Maxim Senin via user mailto:user@flink.apache.org>>
Subject: [External] Exception during autoscaling operation - Flink 
1.18/Operator 1.8.0
Hi.

I already asked before but never got an answer. My observation is that the 
operator, after collecting some stats, is trying to restart one of the 
deployments. This includes taking a savepoint (`takeSavepointOnUpgrade: true`, 
`upgradeMode: savepoint`) and “gracefully” shutting down the JobManager by 
“scaling it to zero” (by setting replicas = 0 in the new generated config).

However, the deployment never comes back up, apparently, due to exception:

2024-04-25 17:20:52,920 mi.j.o.p.e.ReconciliationDispatcher 
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Error during error status 
handling.
org.apache.flink.kubernetes.operator.exception.StatusConflictException: Status 
have been modified externally in version 50607043 Previous: 
{"jobStatus":{"jobName":"autoscaling 
test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHEDINFO\\n\"},\"mode\":\"native\"},\"resource_met

Re: [External] Exception during autoscaling operation - Flink 1.18/Operator 1.8.0

2024-04-26 Thread Gyula Fóra
Hi Maxim!

Regarding the status update error, it could be related to a problem that we
have discovered recently with the Flink Operator HA. Where during a
namespace change both leader and follower instances would start processing.
It has been fixed in the current master by updating the JOSDK version to
the one containing the fix.

For details you can check:
https://github.com/operator-framework/java-operator-sdk/issues/2341
https://github.com/apache/flink-kubernetes-operator/commit/29a4aa5adf101920cbe1a3a9a178ff16f52c746d

To resolve the issue (if it's caused by this), you could either cherry-pick
the fix internally to the operator or reduce the replicas to 1 if you are
using HA.

Cheers,
Gyula



On Fri, Apr 26, 2024 at 3:48 AM Maxim Senin via user 
wrote:

> I have also seen this exception:
>
> o.a.f.k.o.o.JobStatusObserver  
> *[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6]
> Job d0ac9da5959d8cc9a82645eeef6751a5 failed with error:
> java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException:
> java.lang.UnsupportedOperationException: Cannot rescale the given pointwise
> partitioner.*
>
> *Did you change the partitioner to forward or rescale?*
>
> *It may also help to add an explicit shuffle().*
>
> *at
> org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$23(FutureUtils.java:1305)*
>
> *at
> java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)*
>
> *at
> java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown
> Source)*
>
> *at
> java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown
> Source)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)*
>
> *at
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)*
>
> *at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)*
>
> *at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)*
>
> *at
> scala.PartialFunction.applyOrElse(PartialFunction.scala:127)*
>
> *at
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)*
>
> *at
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)*
>
> I can’t find any information on how to interpret this. Please advise..
>
> Cheers,
> Maxim
>
> *From: *Maxim Senin via user 
> *Date: *Thursday, April 25, 2024 at 12:01 PM
> *To: *Maxim Senin via user 
> *Subject: *[External] Exception during autoscaling operation - Flink
> 1.18/Operator 1.8.0
>
> Hi.
>
> I already asked before but never got an answer. My observation is that the
> operator, after collecting some stats, is trying to restart one of the
> deployments. This includes taking a savepoint (`takeSavepointOnUpgrade:
> true`, `upgradeMode: savepoint`) and “gracefully” shutting down the
> JobManager by “scaling it to zero” (by setting replicas = 0 in the new
> generated config).
>
> However, the deployment never comes back up, apparently, due to exception:
>
>
> 2024-04-25 17:20:52,920 mi.j.o.p.e.ReconciliationDispatcher *[ERROR]*
> [flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] *Error* during error
> status handling.
>
> org.apache.flink.kubernetes.operator.exception.StatusConflictException:
> Status have been modified externally in version 50607043 Previous:
> {"jobStatus":{"jobName":"autoscaling
> test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHEDINFO\\n\"},\"mode\":\"native\"},\"resource_metadata\":…
>
> *at
> org.apache.flink.kubernetes.operator.utils.StatusRecorder.replaceStatus(StatusRecorder.java:161)*
>
> *at
> org.apache.flink.kubernetes.operator.utils.StatusRecorder.patchAndCacheStatus(StatusRecorder.java:97)*
>
> *at
> org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils.toErrorStatusUpdateControl(ReconciliationUtils.java:438)*
>
> *at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:209)*
>
> *at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:57)*
>
> *at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleErrorStatusHandler(ReconciliationDispatcher.java:194)*
>
> *at
> 

Re: [External] Exception during autoscaling operation - Flink 1.18/Operator 1.8.0

2024-04-25 Thread Maxim Senin via user
I have also seen this exception:

o.a.f.k.o.o.JobStatusObserver  
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Job 
d0ac9da5959d8cc9a82645eeef6751a5 failed with error: 
java.util.concurrent.CompletionException: 
java.util.concurrent.CompletionException: 
java.lang.UnsupportedOperationException: Cannot rescale the given pointwise 
partitioner.
Did you change the partitioner to forward or rescale?
It may also help to add an explicit shuffle().
at 
org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$23(FutureUtils.java:1305)
at 
java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
at 
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown 
Source)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)

I can’t find any information on how to interpret this. Please advise..

Cheers,
Maxim
From: Maxim Senin via user 
Date: Thursday, April 25, 2024 at 12:01 PM
To: Maxim Senin via user 
Subject: [External] Exception during autoscaling operation - Flink 
1.18/Operator 1.8.0
Hi.

I already asked before but never got an answer. My observation is that the 
operator, after collecting some stats, is trying to restart one of the 
deployments. This includes taking a savepoint (`takeSavepointOnUpgrade: true`, 
`upgradeMode: savepoint`) and “gracefully” shutting down the JobManager by 
“scaling it to zero” (by setting replicas = 0 in the new generated config).

However, the deployment never comes back up, apparently, due to exception:

2024-04-25 17:20:52,920 mi.j.o.p.e.ReconciliationDispatcher 
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Error during error status 
handling.
org.apache.flink.kubernetes.operator.exception.StatusConflictException: Status 
have been modified externally in version 50607043 Previous: 
{"jobStatus":{"jobName":"autoscaling 
test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHEDINFO\\n\"},\"mode\":\"native\"},\"resource_metadata\":…
at 
org.apache.flink.kubernetes.operator.utils.StatusRecorder.replaceStatus(StatusRecorder.java:161)
at 
org.apache.flink.kubernetes.operator.utils.StatusRecorder.patchAndCacheStatus(StatusRecorder.java:97)
at 
org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils.toErrorStatusUpdateControl(ReconciliationUtils.java:438)
at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:209)
at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:57)
at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleErrorStatusHandler(ReconciliationDispatcher.java:194)
at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:123)
at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:91)
at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:64)
at 
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:417)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
at java.base/java.lang.Thread.run(Unknown Source)
2024-04-25 17:20:52,925 mi.j.o.p.e.ReconciliationDispatcher 
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Error during event 
processing ExecutionScope{ resource id: 
ResourceID{name='f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6', namespace='flink'}, 
version: 50606957} failed.