This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 662fa612 [FLINK-33107] Use correct upgrade mode when executing rollback, simplify rollback flow 662fa612 is described below commit 662fa612a8ab352e43ab8a99fa61aadfbe41e4d7 Author: Gyula Fora <g_f...@apple.com> AuthorDate: Mon Sep 25 09:12:20 2023 +0200 [FLINK-33107] Use correct upgrade mode when executing rollback, simplify rollback flow --- .../operator/autoscaler/JobAutoScalerImpl.java | 2 +- .../operator/reconciler/ReconciliationUtils.java | 78 ++++++++------ .../AbstractFlinkResourceReconciler.java | 116 ++++++++++----------- .../deployment/AbstractJobReconciler.java | 5 +- .../deployment/ApplicationReconciler.java | 10 +- .../reconciler/deployment/SessionReconciler.java | 4 +- .../operator/controller/RollbackTest.java | 7 +- .../deployment/ApplicationReconcilerTest.java | 81 ++++++++++++++ 8 files changed, 199 insertions(+), 104 deletions(-) diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java index b3013a0a..402f88bf 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java @@ -171,7 +171,7 @@ public class JobAutoScalerImpl implements JobAutoScaler { var status = resource.getStatus(); if (status.getLifecycleState() != ResourceLifecycleState.STABLE || !status.getJobStatus().getState().equals(JobStatus.RUNNING.name())) { - LOG.info("Autoscaler is waiting for RUNNING job state"); + LOG.info("Autoscaler is waiting for stable, running state"); lastEvaluatedMetrics.remove(resourceId); return; } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java index 5bf120bf..a4c97bec 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java @@ -125,47 +125,59 @@ public class ReconciliationUtils { // Clear errors status.setError(null); reconciliationStatus.setReconciliationTimestamp(clock.instant().toEpochMilli()); - ReconciliationState state; - if (status.getReconciliationStatus().getState() == ReconciliationState.ROLLING_BACK) { + + var state = reconciliationStatus.getState(); + if (state == ReconciliationState.ROLLING_BACK) { state = upgrading ? ReconciliationState.ROLLING_BACK : ReconciliationState.ROLLED_BACK; } else { state = upgrading ? ReconciliationState.UPGRADING : ReconciliationState.DEPLOYED; } reconciliationStatus.setState(state); - SPEC clonedSpec; - if (status.getReconciliationStatus().getState() == ReconciliationState.ROLLING_BACK - || status.getReconciliationStatus().getState() == ReconciliationState.ROLLED_BACK) { - clonedSpec = reconciliationStatus.deserializeLastReconciledSpec(); - } else { - clonedSpec = ReconciliationUtils.clone(spec); - } - if (spec.getJob() != null) { - // For jobs we have to adjust the reconciled spec - var job = clonedSpec.getJob(); - job.setState(stateAfterReconcile); - - var lastSpec = reconciliationStatus.deserializeLastReconciledSpec(); - if (lastSpec != null) { - // We preserve the last snapshot triggers to not lose new triggers during upgrade - job.setSavepointTriggerNonce(lastSpec.getJob().getSavepointTriggerNonce()); - job.setCheckpointTriggerNonce(lastSpec.getJob().getCheckpointTriggerNonce()); - } - - if (target instanceof FlinkDeployment) { - // For application deployments we update the taskmanager info - ((FlinkDeploymentStatus) status) - .setTaskManager( - getTaskManagerInfo( - target.getMetadata().getName(), conf, stateAfterReconcile)); - } - reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec, target); - if (spec.getJob().getState() == JobState.SUSPENDED) { - // When a job is suspended by the user it is automatically marked stable - reconciliationStatus.markReconciledSpecAsStable(); + if (state == ReconciliationState.ROLLING_BACK || state == ReconciliationState.ROLLED_BACK) { + var lastSpecWithMeta = reconciliationStatus.deserializeLastReconciledSpecWithMeta(); + var job = lastSpecWithMeta.getSpec().getJob(); + if (job != null) { + // During the rollback we have to update the upgradeMode in the lastReconciledSpec + // based on the rollback upgradeMode, this ensures that the next upgrade can be + // executed correctly and we don't accidentally lose state. + job.setUpgradeMode(spec.getJob().getUpgradeMode()); + reconciliationStatus.setLastReconciledSpec( + SpecUtils.writeSpecWithMeta( + lastSpecWithMeta.getSpec(), lastSpecWithMeta.getMeta())); } } else { - reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec, target); + SPEC clonedSpec = ReconciliationUtils.clone(spec); + if (spec.getJob() != null) { + // For jobs we have to adjust the reconciled spec + var job = clonedSpec.getJob(); + job.setState(stateAfterReconcile); + + var lastSpec = reconciliationStatus.deserializeLastReconciledSpec(); + if (lastSpec != null) { + // We preserve the last snapshot triggers to not lose new triggers during + // upgrade + job.setSavepointTriggerNonce(lastSpec.getJob().getSavepointTriggerNonce()); + job.setCheckpointTriggerNonce(lastSpec.getJob().getCheckpointTriggerNonce()); + } + + if (target instanceof FlinkDeployment) { + // For application deployments we update the taskmanager info + ((FlinkDeploymentStatus) status) + .setTaskManager( + getTaskManagerInfo( + target.getMetadata().getName(), + conf, + stateAfterReconcile)); + } + reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec, target); + if (spec.getJob().getState() == JobState.SUSPENDED) { + // When a job is suspended by the user it is automatically marked stable + reconciliationStatus.markReconciledSpecAsStable(); + } + } else { + reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec, target); + } } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java index 91edcf70..ffe636dc 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java @@ -47,7 +47,6 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,6 +133,7 @@ public abstract class AbstractFlinkResourceReconciler< SPEC currentDeploySpec = cr.getSpec(); resourceScaler.scale(ctx); + var reconciliationState = reconciliationStatus.getState(); var specDiff = new ReflectiveDiffBuilder<>( ctx.getDeploymentMode(), lastReconciledSpec, currentDeploySpec) @@ -141,14 +141,14 @@ public abstract class AbstractFlinkResourceReconciler< var diffType = specDiff.getType(); boolean specChanged = - DiffType.IGNORE != diffType - || reconciliationStatus.getState() == ReconciliationState.UPGRADING; + DiffType.IGNORE != diffType || reconciliationState == ReconciliationState.UPGRADING; - if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) { - specChanged = prepareCrForRollback(ctx, currentDeploySpec, lastReconciledSpec); + if (shouldRollBack(ctx, specChanged, lastReconciledSpec)) { + prepareCrForRollback(ctx, specChanged, lastReconciledSpec); + specChanged = true; + diffType = DiffType.UPGRADE; } - var observeConfig = ctx.getObserveConfig(); if (specChanged) { var deployConfig = ctx.getDeployConfig(cr.getSpec()); if (checkNewSpecAlreadyDeployed(cr, deployConfig)) { @@ -160,7 +160,7 @@ public abstract class AbstractFlinkResourceReconciler< boolean scaled = diffType != DiffType.UPGRADE && scale(ctx, deployConfig); // Reconcile spec change unless scaling was enough - if (scaled || reconcileSpecChange(ctx, deployConfig)) { + if (scaled || reconcileSpecChange(ctx, deployConfig, lastReconciledSpec)) { // If we executed a scale or spec upgrade action we return, otherwise we // continue to reconcile other changes return; @@ -169,20 +169,7 @@ public abstract class AbstractFlinkResourceReconciler< ReconciliationUtils.updateReconciliationMetadata(cr); } - if (shouldRollBack(ctx, observeConfig)) { - // Rollbacks are executed in two steps, we initiate it first then return - if (initiateRollBack(status)) { - return; - } - LOG.warn(MSG_ROLLBACK); - eventRecorder.triggerEvent( - cr, - EventRecorder.Type.Normal, - EventRecorder.Reason.Rollback, - EventRecorder.Component.JobManagerDeployment, - MSG_ROLLBACK, - ctx.getKubernetesClient()); - } else if (!reconcileOtherChanges(ctx)) { + if (!reconcileOtherChanges(ctx)) { LOG.info("Resource fully reconciled, nothing to do..."); } } @@ -243,11 +230,13 @@ public abstract class AbstractFlinkResourceReconciler< * * @param ctx Reconciliation context. * @param deployConfig Deployment configuration. + * @param lastReconciledSpec Last reconciled spec * @throws Exception Error during spec upgrade. * @return True if spec change reconciliation was executed */ protected abstract boolean reconcileSpecChange( - FlinkResourceContext<CR> ctx, Configuration deployConfig) throws Exception; + FlinkResourceContext<CR> ctx, Configuration deployConfig, SPEC lastReconciledSpec) + throws Exception; /** * Reconcile any other changes required for this resource that are specific to the reconciler @@ -347,17 +336,24 @@ public abstract class AbstractFlinkResourceReconciler< * <p>Rollbacks are only supported to previously running resource specs with HA enabled. * * @param ctx Reconciliation context. - * @param configuration Flink cluster configuration. + * @param specChanged Flag indicating whether the spec changed * @return True if the resource should be rolled back. */ - private boolean shouldRollBack(FlinkResourceContext<CR> ctx, Configuration configuration) { + private boolean shouldRollBack( + FlinkResourceContext<CR> ctx, boolean specChanged, SPEC lastReconciledSpec) { var resource = ctx.getResource(); var reconciliationStatus = resource.getStatus().getReconciliationStatus(); + var configuration = ctx.getObserveConfig(); + if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) { return true; } + if (specChanged) { + return false; + } + if (!configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED) || reconciliationStatus.getState() == ReconciliationState.ROLLED_BACK || reconciliationStatus.isLastReconciledSpecStable()) { @@ -389,8 +385,8 @@ public abstract class AbstractFlinkResourceReconciler< return false; } - if (resource.getSpec().getJob() != null - && resource.getSpec().getJob().getUpgradeMode() == UpgradeMode.SAVEPOINT + if (lastReconciledSpec.getJob() != null + && lastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.SAVEPOINT && FlinkUtils.jmPodNeverStarted(ctx.getJosdkContext())) { // HA data not available as JM never start and relying on SAVEPOINT upgrade mode // Safe to rollback relying on savepoint @@ -404,47 +400,49 @@ public abstract class AbstractFlinkResourceReconciler< return haDataAvailable; } - /** - * Initiate rollback process by changing the {@link ReconciliationState} in the status. - * - * @param status Resource status. - * @return True if a new rollback was initiated. - */ - private boolean initiateRollBack(STATUS status) { + private void prepareCrForRollback( + FlinkResourceContext<CR> ctx, boolean specChanged, SPEC lastReconciledSpec) { + var cr = ctx.getResource(); + var status = cr.getStatus(); var reconciliationStatus = status.getReconciliationStatus(); + if (reconciliationStatus.getState() != ReconciliationState.ROLLING_BACK) { - LOG.warn("Preparing to roll back to last stable spec."); - if (StringUtils.isEmpty(status.getError())) { - status.setError( - "Deployment is not ready within the configured timeout, rolling back."); - } + // When we initiate rollback we trigger a one time event reconciliationStatus.setState(ReconciliationState.ROLLING_BACK); - return true; + LOG.warn(MSG_ROLLBACK); + eventRecorder.triggerEvent( + ctx.getResource(), + EventRecorder.Type.Normal, + EventRecorder.Reason.Rollback, + EventRecorder.Component.JobManagerDeployment, + MSG_ROLLBACK, + ctx.getKubernetesClient()); + } else { + if (lastReconciledSpec.getJob() != null) { + // The rollback SUSPENDED status is not recorded anywhere currently. Since the + // reconciler looks at the lastReconciled spec state to decide on the next action + // (cancel vs deploy) this is a simple trick to make the rollback flow work + // correctly. + lastReconciledSpec.getJob().setState(JobState.SUSPENDED); + } } - return false; - } - private boolean prepareCrForRollback( - FlinkResourceContext<CR> ctx, SPEC currentDeploySpec, SPEC lastReconciledSpec) { - var cr = ctx.getResource(); - var reconciliationStatus = cr.getStatus().getReconciliationStatus(); - // Spec has changed while rolling back we should apply new spec and move to upgrading - // state. Don't take in account changes on job.state as it could be overriden to running if - // the current spec is not valid - if (lastReconciledSpec.getJob() != null) { - lastReconciledSpec.getJob().setState(currentDeploySpec.getJob().getState()); - } - var specDiffRollingBack = - new ReflectiveDiffBuilder<>( - ctx.getDeploymentMode(), lastReconciledSpec, currentDeploySpec) - .build(); - if (DiffType.IGNORE != specDiffRollingBack.getType()) { + if (specChanged) { + // If spec has changed while rolling back we should apply new spec and move to upgrading + // state to break out of the rollback flow. reconciliationStatus.setState(ReconciliationState.UPGRADING); } else { - // Rely on the last stable spec if rolling back and no change in the spec - cr.setSpec(cr.getStatus().getReconciliationStatus().deserializeLastStableSpec()); + cr.setSpec(reconciliationStatus.deserializeLastStableSpec()); + var job = cr.getSpec().getJob(); + if (job != null) { + // The last stable spec may have a completely different upgrade mode, then what we + // used the last time. We set it based on the lastReconciledSpec + job.setUpgradeMode( + lastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.STATELESS + ? UpgradeMode.STATELESS + : UpgradeMode.LAST_STATE); + } } - return true; } /** diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java index 6db4905c..c12367ff 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java @@ -85,13 +85,12 @@ public abstract class AbstractJobReconciler< } @Override - protected boolean reconcileSpecChange(FlinkResourceContext<CR> ctx, Configuration deployConfig) + protected boolean reconcileSpecChange( + FlinkResourceContext<CR> ctx, Configuration deployConfig, SPEC lastReconciledSpec) throws Exception { var resource = ctx.getResource(); STATUS status = resource.getStatus(); - var reconciliationStatus = status.getReconciliationStatus(); - SPEC lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec(); SPEC currentDeploySpec = resource.getSpec(); JobState currentJobState = lastReconciledSpec.getJob().getState(); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java index cc5594c2..f3719fd6 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java @@ -85,9 +85,13 @@ public class ApplicationReconciler } var flinkService = ctx.getFlinkService(); - if (deployConfig.getBoolean( - KubernetesOperatorConfigOptions - .OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED) + boolean lastStateAllowed = + deployment.getSpec().getJob().getUpgradeMode() == UpgradeMode.LAST_STATE + || deployConfig.getBoolean( + KubernetesOperatorConfigOptions + .OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED); + + if (lastStateAllowed && HighAvailabilityMode.isHighAvailabilityModeActivated(deployConfig) && HighAvailabilityMode.isHighAvailabilityModeActivated(ctx.getObserveConfig()) && !flinkVersionChanged( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java index 1f6d2cbd..335f6259 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java @@ -60,7 +60,9 @@ public class SessionReconciler @Override protected boolean reconcileSpecChange( - FlinkResourceContext<FlinkDeployment> ctx, Configuration deployConfig) + FlinkResourceContext<FlinkDeployment> ctx, + Configuration deployConfig, + FlinkDeploymentSpec lastReconciledSpec) throws Exception { var deployment = ctx.getResource(); deleteSessionCluster(ctx); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java index 667a13c8..f6ca19ae 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java @@ -374,11 +374,10 @@ public class RollbackTest { assertFalse(deployment.getStatus().getReconciliationStatus().isLastReconciledSpecStable()); assertEquals( - ReconciliationState.ROLLING_BACK, + deployment.getSpec().getJob() != null + ? ReconciliationState.ROLLING_BACK + : ReconciliationState.ROLLED_BACK, deployment.getStatus().getReconciliationStatus().getState()); - assertEquals( - "Deployment is not ready within the configured timeout, rolling back.", - deployment.getStatus().getError()); if (injectValidationError) { deployment.getSpec().setLogConfiguration(Map.of("invalid", "entry")); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java index 504bf671..bc5c267b 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java @@ -136,6 +136,8 @@ public class ApplicationReconcilerTest extends OperatorTestBase { private FlinkOperatorConfiguration operatorConfig; private ExecutorService executorService; + private Clock testClock = Clock.systemDefaultZone(); + @Override public void setup() { appReconciler = @@ -1159,4 +1161,83 @@ public class ApplicationReconcilerTest extends OperatorTestBase { .getMetadata() .getGeneration()); } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testRollbackUpgradeModeHandling(boolean jmStarted) throws Exception { + var deployment = TestUtils.buildApplicationCluster(); + deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT); + offsetReconcilerClock(deployment, Duration.ZERO); + + var flinkConfiguration = deployment.getSpec().getFlinkConfiguration(); + flinkConfiguration.put( + KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED.key(), "true"); + flinkConfiguration.put( + KubernetesOperatorConfigOptions.DEPLOYMENT_READINESS_TIMEOUT.key(), "10s"); + flinkConfiguration.put( + KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED + .key(), + "false"); + + // Initial deployment, mark as stable + reconciler.reconcile(deployment, context); + verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs()); + deployment.getStatus().getReconciliationStatus().markReconciledSpecAsStable(); + + // Submit invalid change + deployment.getSpec().getJob().setParallelism(9999); + reconciler.reconcile(deployment, context); + reconciler.reconcile(deployment, context); + assertEquals(1, flinkService.listJobs().size()); + assertEquals( + UpgradeMode.STATELESS, + deployment + .getStatus() + .getReconciliationStatus() + .deserializeLastStableSpec() + .getJob() + .getUpgradeMode()); + assertEquals( + UpgradeMode.SAVEPOINT, + deployment + .getStatus() + .getReconciliationStatus() + .deserializeLastReconciledSpec() + .getJob() + .getUpgradeMode()); + + // Trigger rollback by delaying the recovery + offsetReconcilerClock(deployment, Duration.ofSeconds(15)); + flinkService.setHaDataAvailable(jmStarted); + flinkService.setJobManagerReady(jmStarted); + reconciler.reconcile(deployment, context); + + assertEquals( + ReconciliationState.ROLLING_BACK, + deployment.getStatus().getReconciliationStatus().getState()); + assertEquals(0, flinkService.listJobs().size()); + assertEquals("FINISHED", deployment.getStatus().getJobStatus().getState()); + assertEquals( + jmStarted ? UpgradeMode.LAST_STATE : UpgradeMode.SAVEPOINT, + deployment + .getStatus() + .getReconciliationStatus() + .deserializeLastReconciledSpec() + .getJob() + .getUpgradeMode()); + + flinkService.setJobManagerReady(true); + reconciler.reconcile(deployment, context); + + assertEquals( + ReconciliationState.ROLLED_BACK, + deployment.getStatus().getReconciliationStatus().getState()); + assertEquals(1, flinkService.listJobs().size()); + assertEquals("RECONCILING", deployment.getStatus().getJobStatus().getState()); + } + + private void offsetReconcilerClock(FlinkDeployment dep, Duration offset) { + testClock = Clock.offset(testClock, offset); + appReconciler.setClock(testClock); + } }