This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 662fa612 [FLINK-33107] Use correct upgrade mode when executing 
rollback, simplify rollback flow
662fa612 is described below

commit 662fa612a8ab352e43ab8a99fa61aadfbe41e4d7
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Mon Sep 25 09:12:20 2023 +0200

    [FLINK-33107] Use correct upgrade mode when executing rollback, simplify 
rollback flow
---
 .../operator/autoscaler/JobAutoScalerImpl.java     |   2 +-
 .../operator/reconciler/ReconciliationUtils.java   |  78 ++++++++------
 .../AbstractFlinkResourceReconciler.java           | 116 ++++++++++-----------
 .../deployment/AbstractJobReconciler.java          |   5 +-
 .../deployment/ApplicationReconciler.java          |  10 +-
 .../reconciler/deployment/SessionReconciler.java   |   4 +-
 .../operator/controller/RollbackTest.java          |   7 +-
 .../deployment/ApplicationReconcilerTest.java      |  81 ++++++++++++++
 8 files changed, 199 insertions(+), 104 deletions(-)

diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
index b3013a0a..402f88bf 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
@@ -171,7 +171,7 @@ public class JobAutoScalerImpl implements JobAutoScaler {
         var status = resource.getStatus();
         if (status.getLifecycleState() != ResourceLifecycleState.STABLE
                 || 
!status.getJobStatus().getState().equals(JobStatus.RUNNING.name())) {
-            LOG.info("Autoscaler is waiting for RUNNING job state");
+            LOG.info("Autoscaler is waiting for stable, running state");
             lastEvaluatedMetrics.remove(resourceId);
             return;
         }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 5bf120bf..a4c97bec 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -125,47 +125,59 @@ public class ReconciliationUtils {
         // Clear errors
         status.setError(null);
         
reconciliationStatus.setReconciliationTimestamp(clock.instant().toEpochMilli());
-        ReconciliationState state;
-        if (status.getReconciliationStatus().getState() == 
ReconciliationState.ROLLING_BACK) {
+
+        var state = reconciliationStatus.getState();
+        if (state == ReconciliationState.ROLLING_BACK) {
             state = upgrading ? ReconciliationState.ROLLING_BACK : 
ReconciliationState.ROLLED_BACK;
         } else {
             state = upgrading ? ReconciliationState.UPGRADING : 
ReconciliationState.DEPLOYED;
         }
         reconciliationStatus.setState(state);
 
-        SPEC clonedSpec;
-        if (status.getReconciliationStatus().getState() == 
ReconciliationState.ROLLING_BACK
-                || status.getReconciliationStatus().getState() == 
ReconciliationState.ROLLED_BACK) {
-            clonedSpec = reconciliationStatus.deserializeLastReconciledSpec();
-        } else {
-            clonedSpec = ReconciliationUtils.clone(spec);
-        }
-        if (spec.getJob() != null) {
-            // For jobs we have to adjust the reconciled spec
-            var job = clonedSpec.getJob();
-            job.setState(stateAfterReconcile);
-
-            var lastSpec = 
reconciliationStatus.deserializeLastReconciledSpec();
-            if (lastSpec != null) {
-                // We preserve the last snapshot triggers to not lose new 
triggers during upgrade
-                
job.setSavepointTriggerNonce(lastSpec.getJob().getSavepointTriggerNonce());
-                
job.setCheckpointTriggerNonce(lastSpec.getJob().getCheckpointTriggerNonce());
-            }
-
-            if (target instanceof FlinkDeployment) {
-                // For application deployments we update the taskmanager info
-                ((FlinkDeploymentStatus) status)
-                        .setTaskManager(
-                                getTaskManagerInfo(
-                                        target.getMetadata().getName(), conf, 
stateAfterReconcile));
-            }
-            reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec, 
target);
-            if (spec.getJob().getState() == JobState.SUSPENDED) {
-                // When a job is suspended by the user it is automatically 
marked stable
-                reconciliationStatus.markReconciledSpecAsStable();
+        if (state == ReconciliationState.ROLLING_BACK || state == 
ReconciliationState.ROLLED_BACK) {
+            var lastSpecWithMeta = 
reconciliationStatus.deserializeLastReconciledSpecWithMeta();
+            var job = lastSpecWithMeta.getSpec().getJob();
+            if (job != null) {
+                // During the rollback we have to update the upgradeMode in 
the lastReconciledSpec
+                // based on the rollback upgradeMode, this ensures that the 
next upgrade can be
+                // executed correctly and we don't accidentally lose state.
+                job.setUpgradeMode(spec.getJob().getUpgradeMode());
+                reconciliationStatus.setLastReconciledSpec(
+                        SpecUtils.writeSpecWithMeta(
+                                lastSpecWithMeta.getSpec(), 
lastSpecWithMeta.getMeta()));
             }
         } else {
-            reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec, 
target);
+            SPEC clonedSpec = ReconciliationUtils.clone(spec);
+            if (spec.getJob() != null) {
+                // For jobs we have to adjust the reconciled spec
+                var job = clonedSpec.getJob();
+                job.setState(stateAfterReconcile);
+
+                var lastSpec = 
reconciliationStatus.deserializeLastReconciledSpec();
+                if (lastSpec != null) {
+                    // We preserve the last snapshot triggers to not lose new 
triggers during
+                    // upgrade
+                    
job.setSavepointTriggerNonce(lastSpec.getJob().getSavepointTriggerNonce());
+                    
job.setCheckpointTriggerNonce(lastSpec.getJob().getCheckpointTriggerNonce());
+                }
+
+                if (target instanceof FlinkDeployment) {
+                    // For application deployments we update the taskmanager 
info
+                    ((FlinkDeploymentStatus) status)
+                            .setTaskManager(
+                                    getTaskManagerInfo(
+                                            target.getMetadata().getName(),
+                                            conf,
+                                            stateAfterReconcile));
+                }
+                
reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec, target);
+                if (spec.getJob().getState() == JobState.SUSPENDED) {
+                    // When a job is suspended by the user it is automatically 
marked stable
+                    reconciliationStatus.markReconciledSpecAsStable();
+                }
+            } else {
+                
reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec, target);
+            }
         }
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 91edcf70..ffe636dc 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -47,7 +47,6 @@ import 
org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -134,6 +133,7 @@ public abstract class AbstractFlinkResourceReconciler<
         SPEC currentDeploySpec = cr.getSpec();
         resourceScaler.scale(ctx);
 
+        var reconciliationState = reconciliationStatus.getState();
         var specDiff =
                 new ReflectiveDiffBuilder<>(
                                 ctx.getDeploymentMode(), lastReconciledSpec, 
currentDeploySpec)
@@ -141,14 +141,14 @@ public abstract class AbstractFlinkResourceReconciler<
         var diffType = specDiff.getType();
 
         boolean specChanged =
-                DiffType.IGNORE != diffType
-                        || reconciliationStatus.getState() == 
ReconciliationState.UPGRADING;
+                DiffType.IGNORE != diffType || reconciliationState == 
ReconciliationState.UPGRADING;
 
-        if (reconciliationStatus.getState() == 
ReconciliationState.ROLLING_BACK) {
-            specChanged = prepareCrForRollback(ctx, currentDeploySpec, 
lastReconciledSpec);
+        if (shouldRollBack(ctx, specChanged, lastReconciledSpec)) {
+            prepareCrForRollback(ctx, specChanged, lastReconciledSpec);
+            specChanged = true;
+            diffType = DiffType.UPGRADE;
         }
 
-        var observeConfig = ctx.getObserveConfig();
         if (specChanged) {
             var deployConfig = ctx.getDeployConfig(cr.getSpec());
             if (checkNewSpecAlreadyDeployed(cr, deployConfig)) {
@@ -160,7 +160,7 @@ public abstract class AbstractFlinkResourceReconciler<
             boolean scaled = diffType != DiffType.UPGRADE && scale(ctx, 
deployConfig);
 
             // Reconcile spec change unless scaling was enough
-            if (scaled || reconcileSpecChange(ctx, deployConfig)) {
+            if (scaled || reconcileSpecChange(ctx, deployConfig, 
lastReconciledSpec)) {
                 // If we executed a scale or spec upgrade action we return, 
otherwise we
                 // continue to reconcile other changes
                 return;
@@ -169,20 +169,7 @@ public abstract class AbstractFlinkResourceReconciler<
             ReconciliationUtils.updateReconciliationMetadata(cr);
         }
 
-        if (shouldRollBack(ctx, observeConfig)) {
-            // Rollbacks are executed in two steps, we initiate it first then 
return
-            if (initiateRollBack(status)) {
-                return;
-            }
-            LOG.warn(MSG_ROLLBACK);
-            eventRecorder.triggerEvent(
-                    cr,
-                    EventRecorder.Type.Normal,
-                    EventRecorder.Reason.Rollback,
-                    EventRecorder.Component.JobManagerDeployment,
-                    MSG_ROLLBACK,
-                    ctx.getKubernetesClient());
-        } else if (!reconcileOtherChanges(ctx)) {
+        if (!reconcileOtherChanges(ctx)) {
             LOG.info("Resource fully reconciled, nothing to do...");
         }
     }
@@ -243,11 +230,13 @@ public abstract class AbstractFlinkResourceReconciler<
      *
      * @param ctx Reconciliation context.
      * @param deployConfig Deployment configuration.
+     * @param lastReconciledSpec Last reconciled spec
      * @throws Exception Error during spec upgrade.
      * @return True if spec change reconciliation was executed
      */
     protected abstract boolean reconcileSpecChange(
-            FlinkResourceContext<CR> ctx, Configuration deployConfig) throws 
Exception;
+            FlinkResourceContext<CR> ctx, Configuration deployConfig, SPEC 
lastReconciledSpec)
+            throws Exception;
 
     /**
      * Reconcile any other changes required for this resource that are 
specific to the reconciler
@@ -347,17 +336,24 @@ public abstract class AbstractFlinkResourceReconciler<
      * <p>Rollbacks are only supported to previously running resource specs 
with HA enabled.
      *
      * @param ctx Reconciliation context.
-     * @param configuration Flink cluster configuration.
+     * @param specChanged Flag indicating whether the spec changed
      * @return True if the resource should be rolled back.
      */
-    private boolean shouldRollBack(FlinkResourceContext<CR> ctx, Configuration 
configuration) {
+    private boolean shouldRollBack(
+            FlinkResourceContext<CR> ctx, boolean specChanged, SPEC 
lastReconciledSpec) {
 
         var resource = ctx.getResource();
         var reconciliationStatus = 
resource.getStatus().getReconciliationStatus();
+        var configuration = ctx.getObserveConfig();
+
         if (reconciliationStatus.getState() == 
ReconciliationState.ROLLING_BACK) {
             return true;
         }
 
+        if (specChanged) {
+            return false;
+        }
+
         if 
(!configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED)
                 || reconciliationStatus.getState() == 
ReconciliationState.ROLLED_BACK
                 || reconciliationStatus.isLastReconciledSpecStable()) {
@@ -389,8 +385,8 @@ public abstract class AbstractFlinkResourceReconciler<
             return false;
         }
 
-        if (resource.getSpec().getJob() != null
-                && resource.getSpec().getJob().getUpgradeMode() == 
UpgradeMode.SAVEPOINT
+        if (lastReconciledSpec.getJob() != null
+                && lastReconciledSpec.getJob().getUpgradeMode() == 
UpgradeMode.SAVEPOINT
                 && FlinkUtils.jmPodNeverStarted(ctx.getJosdkContext())) {
             // HA data not available as JM never start and relying on 
SAVEPOINT upgrade mode
             // Safe to rollback relying on savepoint
@@ -404,47 +400,49 @@ public abstract class AbstractFlinkResourceReconciler<
         return haDataAvailable;
     }
 
-    /**
-     * Initiate rollback process by changing the {@link ReconciliationState} 
in the status.
-     *
-     * @param status Resource status.
-     * @return True if a new rollback was initiated.
-     */
-    private boolean initiateRollBack(STATUS status) {
+    private void prepareCrForRollback(
+            FlinkResourceContext<CR> ctx, boolean specChanged, SPEC 
lastReconciledSpec) {
+        var cr = ctx.getResource();
+        var status = cr.getStatus();
         var reconciliationStatus = status.getReconciliationStatus();
+
         if (reconciliationStatus.getState() != 
ReconciliationState.ROLLING_BACK) {
-            LOG.warn("Preparing to roll back to last stable spec.");
-            if (StringUtils.isEmpty(status.getError())) {
-                status.setError(
-                        "Deployment is not ready within the configured 
timeout, rolling back.");
-            }
+            // When we initiate rollback we trigger a one time event
             reconciliationStatus.setState(ReconciliationState.ROLLING_BACK);
-            return true;
+            LOG.warn(MSG_ROLLBACK);
+            eventRecorder.triggerEvent(
+                    ctx.getResource(),
+                    EventRecorder.Type.Normal,
+                    EventRecorder.Reason.Rollback,
+                    EventRecorder.Component.JobManagerDeployment,
+                    MSG_ROLLBACK,
+                    ctx.getKubernetesClient());
+        } else {
+            if (lastReconciledSpec.getJob() != null) {
+                // The rollback SUSPENDED status is not recorded anywhere 
currently. Since the
+                // reconciler looks at the lastReconciled spec state to decide 
on the next action
+                // (cancel vs deploy) this is a simple trick to make the 
rollback flow work
+                // correctly.
+                lastReconciledSpec.getJob().setState(JobState.SUSPENDED);
+            }
         }
-        return false;
-    }
 
-    private boolean prepareCrForRollback(
-            FlinkResourceContext<CR> ctx, SPEC currentDeploySpec, SPEC 
lastReconciledSpec) {
-        var cr = ctx.getResource();
-        var reconciliationStatus = cr.getStatus().getReconciliationStatus();
-        // Spec has changed while rolling back we should apply new spec and 
move to upgrading
-        // state. Don't take in account changes on job.state as it could be 
overriden to running if
-        // the current spec is not valid
-        if (lastReconciledSpec.getJob() != null) {
-            
lastReconciledSpec.getJob().setState(currentDeploySpec.getJob().getState());
-        }
-        var specDiffRollingBack =
-                new ReflectiveDiffBuilder<>(
-                                ctx.getDeploymentMode(), lastReconciledSpec, 
currentDeploySpec)
-                        .build();
-        if (DiffType.IGNORE != specDiffRollingBack.getType()) {
+        if (specChanged) {
+            // If spec has changed while rolling back we should apply new spec 
and move to upgrading
+            // state to break out of the rollback flow.
             reconciliationStatus.setState(ReconciliationState.UPGRADING);
         } else {
-            // Rely on the last stable spec if rolling back and no change in 
the spec
-            
cr.setSpec(cr.getStatus().getReconciliationStatus().deserializeLastStableSpec());
+            cr.setSpec(reconciliationStatus.deserializeLastStableSpec());
+            var job = cr.getSpec().getJob();
+            if (job != null) {
+                // The last stable spec may have a completely different 
upgrade mode, then what we
+                // used the last time. We set it based on the 
lastReconciledSpec
+                job.setUpgradeMode(
+                        lastReconciledSpec.getJob().getUpgradeMode() == 
UpgradeMode.STATELESS
+                                ? UpgradeMode.STATELESS
+                                : UpgradeMode.LAST_STATE);
+            }
         }
-        return true;
     }
 
     /**
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 6db4905c..c12367ff 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -85,13 +85,12 @@ public abstract class AbstractJobReconciler<
     }
 
     @Override
-    protected boolean reconcileSpecChange(FlinkResourceContext<CR> ctx, 
Configuration deployConfig)
+    protected boolean reconcileSpecChange(
+            FlinkResourceContext<CR> ctx, Configuration deployConfig, SPEC 
lastReconciledSpec)
             throws Exception {
 
         var resource = ctx.getResource();
         STATUS status = resource.getStatus();
-        var reconciliationStatus = status.getReconciliationStatus();
-        SPEC lastReconciledSpec = 
reconciliationStatus.deserializeLastReconciledSpec();
         SPEC currentDeploySpec = resource.getSpec();
 
         JobState currentJobState = lastReconciledSpec.getJob().getState();
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index cc5594c2..f3719fd6 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -85,9 +85,13 @@ public class ApplicationReconciler
         }
         var flinkService = ctx.getFlinkService();
 
-        if (deployConfig.getBoolean(
-                        KubernetesOperatorConfigOptions
-                                
.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED)
+        boolean lastStateAllowed =
+                deployment.getSpec().getJob().getUpgradeMode() == 
UpgradeMode.LAST_STATE
+                        || deployConfig.getBoolean(
+                                KubernetesOperatorConfigOptions
+                                        
.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED);
+
+        if (lastStateAllowed
                 && 
HighAvailabilityMode.isHighAvailabilityModeActivated(deployConfig)
                 && 
HighAvailabilityMode.isHighAvailabilityModeActivated(ctx.getObserveConfig())
                 && !flinkVersionChanged(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 1f6d2cbd..335f6259 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -60,7 +60,9 @@ public class SessionReconciler
 
     @Override
     protected boolean reconcileSpecChange(
-            FlinkResourceContext<FlinkDeployment> ctx, Configuration 
deployConfig)
+            FlinkResourceContext<FlinkDeployment> ctx,
+            Configuration deployConfig,
+            FlinkDeploymentSpec lastReconciledSpec)
             throws Exception {
         var deployment = ctx.getResource();
         deleteSessionCluster(ctx);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
index 667a13c8..f6ca19ae 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
@@ -374,11 +374,10 @@ public class RollbackTest {
 
         
assertFalse(deployment.getStatus().getReconciliationStatus().isLastReconciledSpecStable());
         assertEquals(
-                ReconciliationState.ROLLING_BACK,
+                deployment.getSpec().getJob() != null
+                        ? ReconciliationState.ROLLING_BACK
+                        : ReconciliationState.ROLLED_BACK,
                 deployment.getStatus().getReconciliationStatus().getState());
-        assertEquals(
-                "Deployment is not ready within the configured timeout, 
rolling back.",
-                deployment.getStatus().getError());
 
         if (injectValidationError) {
             deployment.getSpec().setLogConfiguration(Map.of("invalid", 
"entry"));
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 504bf671..bc5c267b 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -136,6 +136,8 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
     private FlinkOperatorConfiguration operatorConfig;
     private ExecutorService executorService;
 
+    private Clock testClock = Clock.systemDefaultZone();
+
     @Override
     public void setup() {
         appReconciler =
@@ -1159,4 +1161,83 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
                         .getMetadata()
                         .getGeneration());
     }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testRollbackUpgradeModeHandling(boolean jmStarted) throws 
Exception {
+        var deployment = TestUtils.buildApplicationCluster();
+        deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+        offsetReconcilerClock(deployment, Duration.ZERO);
+
+        var flinkConfiguration = deployment.getSpec().getFlinkConfiguration();
+        flinkConfiguration.put(
+                
KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED.key(), "true");
+        flinkConfiguration.put(
+                
KubernetesOperatorConfigOptions.DEPLOYMENT_READINESS_TIMEOUT.key(), "10s");
+        flinkConfiguration.put(
+                
KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED
+                        .key(),
+                "false");
+
+        // Initial deployment, mark as stable
+        reconciler.reconcile(deployment, context);
+        verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+        
deployment.getStatus().getReconciliationStatus().markReconciledSpecAsStable();
+
+        // Submit invalid change
+        deployment.getSpec().getJob().setParallelism(9999);
+        reconciler.reconcile(deployment, context);
+        reconciler.reconcile(deployment, context);
+        assertEquals(1, flinkService.listJobs().size());
+        assertEquals(
+                UpgradeMode.STATELESS,
+                deployment
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .deserializeLastStableSpec()
+                        .getJob()
+                        .getUpgradeMode());
+        assertEquals(
+                UpgradeMode.SAVEPOINT,
+                deployment
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .deserializeLastReconciledSpec()
+                        .getJob()
+                        .getUpgradeMode());
+
+        // Trigger rollback by delaying the recovery
+        offsetReconcilerClock(deployment, Duration.ofSeconds(15));
+        flinkService.setHaDataAvailable(jmStarted);
+        flinkService.setJobManagerReady(jmStarted);
+        reconciler.reconcile(deployment, context);
+
+        assertEquals(
+                ReconciliationState.ROLLING_BACK,
+                deployment.getStatus().getReconciliationStatus().getState());
+        assertEquals(0, flinkService.listJobs().size());
+        assertEquals("FINISHED", 
deployment.getStatus().getJobStatus().getState());
+        assertEquals(
+                jmStarted ? UpgradeMode.LAST_STATE : UpgradeMode.SAVEPOINT,
+                deployment
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .deserializeLastReconciledSpec()
+                        .getJob()
+                        .getUpgradeMode());
+
+        flinkService.setJobManagerReady(true);
+        reconciler.reconcile(deployment, context);
+
+        assertEquals(
+                ReconciliationState.ROLLED_BACK,
+                deployment.getStatus().getReconciliationStatus().getState());
+        assertEquals(1, flinkService.listJobs().size());
+        assertEquals("RECONCILING", 
deployment.getStatus().getJobStatus().getState());
+    }
+
+    private void offsetReconcilerClock(FlinkDeployment dep, Duration offset) {
+        testClock = Clock.offset(testClock, offset);
+        appReconciler.setClock(testClock);
+    }
 }

Reply via email to