This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new a3ef0485 [FLINK-34947] Only scale down JM in Foreground deletion propagation and reduce timeout a3ef0485 is described below commit a3ef0485c4e871ca5572f32379e69e0f8a49f627 Author: Gyula Fora <g_f...@apple.com> AuthorDate: Wed Mar 27 08:08:41 2024 +0100 [FLINK-34947] Only scale down JM in Foreground deletion propagation and reduce timeout --- .../operator/service/NativeFlinkService.java | 65 +++++++------ .../operator/service/NativeFlinkServiceTest.java | 104 ++++++++++++--------- 2 files changed, 98 insertions(+), 71 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java index 42163124..d112bfbc 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java @@ -60,6 +60,7 @@ import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.dsl.EditReplacePatchable; import io.fabric8.kubernetes.client.dsl.base.PatchContext; import io.fabric8.kubernetes.client.dsl.base.PatchType; +import org.apache.commons.lang3.ObjectUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,6 +81,7 @@ public class NativeFlinkService extends AbstractFlinkService { private static final Logger LOG = LoggerFactory.getLogger(NativeFlinkService.class); private static final Deployment SCALE_TO_ZERO = new DeploymentBuilder().editOrNewSpec().withReplicas(0).endSpec().build(); + private static final Duration JM_SHUTDOWN_MAX_WAIT = Duration.ofMinutes(1); private final EventRecorder eventRecorder; public NativeFlinkService( @@ -164,12 +166,15 @@ public class NativeFlinkService extends AbstractFlinkService { .inNamespace(namespace) .withName(KubernetesUtils.getDeploymentName(clusterId)); - var remainingTimeout = - scaleJmToZeroBlocking( - jmDeployment, - namespace, - clusterId, - operatorConfig.getFlinkShutdownClusterTimeout()); + var remainingTimeout = operatorConfig.getFlinkShutdownClusterTimeout(); + + // We shut down the JobManager first in the (default) Foreground propagation case to have a + // cleaner exit + if (deletionPropagation == DeletionPropagation.FOREGROUND) { + remainingTimeout = + shutdownJobManagersBlocking( + jmDeployment, namespace, clusterId, remainingTimeout); + } deleteDeploymentBlocking("JobManager", jmDeployment, deletionPropagation, remainingTimeout); } @@ -306,34 +311,42 @@ public class NativeFlinkService extends AbstractFlinkService { } /** - * Scale JM deployment to zero to gracefully stop all JM instances before any TMs are stopped. - * This avoids race conditions between JM shutdown and TM shutdown / failure handling. + * Shut down JobManagers gracefully by scaling JM deployment to zero. This avoids race + * conditions between JM shutdown and TM shutdown / failure handling. * * @param jmDeployment * @param namespace * @param clusterId - * @param timeout + * @param remainingTimeout * @return Remaining timeout after the operation. */ - private Duration scaleJmToZeroBlocking( + private Duration shutdownJobManagersBlocking( EditReplacePatchable<Deployment> jmDeployment, String namespace, String clusterId, - Duration timeout) { - return deleteBlocking( - "Scaling JobManager Deployment to zero", - () -> { - try { - jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO); - } catch (Exception ignore) { - // Ignore all errors here as this is an optional step - return null; - } - return kubernetesClient - .pods() - .inNamespace(namespace) - .withLabels(KubernetesUtils.getJobManagerSelectors(clusterId)); - }, - timeout); + Duration remainingTimeout) { + + // We use only half of the shutdown timeout but at most one minute as the main point + // here is to initiate JM shutdown before the TMs + var jmShutdownTimeout = + ObjectUtils.min(JM_SHUTDOWN_MAX_WAIT, remainingTimeout.dividedBy(2)); + var remaining = + deleteBlocking( + "Scaling JobManager Deployment to zero", + () -> { + try { + jmDeployment.patch( + PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO); + } catch (Exception ignore) { + // Ignore all errors here as this is an optional step + return null; + } + return kubernetesClient + .pods() + .inNamespace(namespace) + .withLabels(KubernetesUtils.getJobManagerSelectors(clusterId)); + }, + jmShutdownTimeout); + return remainingTimeout.minus(jmShutdownTimeout).plus(remaining); } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java index c32957b8..e4e0ad24 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java @@ -38,7 +38,7 @@ import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentContext; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.utils.EventCollector; import org.apache.flink.kubernetes.operator.utils.EventRecorder; -import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.kubernetes.utils.KubernetesUtils; import org.apache.flink.runtime.jobgraph.JobResourceRequirements; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements; @@ -47,22 +47,22 @@ import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody; import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders; -import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; import org.apache.flink.util.concurrent.Executors; import io.fabric8.kubernetes.api.model.DeletionPropagation; -import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.dsl.Resource; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; import java.time.Duration; +import java.time.Instant; import java.util.List; import java.util.Map; import java.util.Optional; @@ -76,7 +76,6 @@ import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLI import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -107,25 +106,19 @@ public class NativeFlinkServiceTest { executorService = Executors.newDirectExecutorService(); } - @Test - public void testDeleteClusterInternal() { - + @ParameterizedTest + @EnumSource(DeletionPropagation.class) + public void testDeleteClusterInternal(DeletionPropagation propagation) { + var timeout = Duration.ofSeconds(4); + configuration.set( + KubernetesOperatorConfigOptions.OPERATOR_RESOURCE_CLEANUP_TIMEOUT, timeout); var flinkService = new NativeFlinkService( - client, null, executorService, operatorConfig, eventRecorder) { - - @Override - protected Duration deleteDeploymentBlocking( - String name, - Resource<Deployment> deployment, - DeletionPropagation propagation, - Duration timeout) { - // Ensure deployment is scaled down before deletion - assertEquals(0, deployment.get().getSpec().getReplicas()); - return super.deleteDeploymentBlocking( - name, deployment, propagation, timeout); - } - }; + client, + null, + executorService, + FlinkOperatorConfiguration.fromConfiguration(configuration), + eventRecorder); var deployment = TestUtils.buildApplicationCluster(); ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration()); @@ -141,18 +134,55 @@ public class NativeFlinkServiceTest { .endSpec() .build(); client.resource(dep).create(); - assertNotNull( - client.apps() - .deployments() - .inNamespace(TestUtils.TEST_NAMESPACE) - .withName(TestUtils.TEST_DEPLOYMENT_NAME) - .get()); + var patched = new AtomicBoolean(false); + mockServer + .expect() + .patch() + .withPath( + String.format( + "/apis/apps/v1/namespaces/%s/deployments/%s", + TestUtils.TEST_NAMESPACE, TestUtils.TEST_DEPLOYMENT_NAME)) + .andReply( + 200, + req -> { + patched.set(true); + return deployment; + }) + .always(); + + // We create the JM pod explicitly here, this will block the JM scale down action + // indefinitely and we use this to verify the correct timeout enforcement + var jmPod = + new PodBuilder() + .withNewMetadata() + .withName("JM") + .withLabels( + KubernetesUtils.getJobManagerSelectors( + TestUtils.TEST_DEPLOYMENT_NAME)) + .withNamespace(TestUtils.TEST_NAMESPACE) + .endMetadata() + .build(); + client.resource(jmPod).create(); + + var start = Instant.now(); flinkService.deleteClusterInternal( deployment.getMetadata().getNamespace(), deployment.getMetadata().getName(), configManager.getObserveConfig(deployment), - DeletionPropagation.FOREGROUND); + propagation); + var measured = Duration.between(start, Instant.now()); + + // Do not scale JM deployment during orphan deletion + if (propagation == DeletionPropagation.FOREGROUND) { + assertTrue(patched.get()); + // We make sure that we dont use up the entire timeout for jm deletion + assertTrue(timeout.minus(measured).toSeconds() > 0); + // Validate that we actually waited 2 seconds + assertTrue(measured.toSeconds() > 1); + } else { + assertFalse(patched.get()); + } assertNull( client.apps() @@ -476,22 +506,6 @@ public class NativeFlinkServiceTest { scaled); } - private JobDetailsInfo.JobVertexDetailsInfo jobVertexDetailsInfo( - JobVertexID jvi, int parallelism) { - var ioMetricsInfo = new IOMetricsInfo(0, false, 0, false, 0, false, 0, false, 0L, 0L, 0.); - return new JobDetailsInfo.JobVertexDetailsInfo( - jvi, - "", - 900, - parallelism, - ExecutionState.RUNNING, - 0, - 0, - 0, - Map.of(), - ioMetricsInfo); - } - @Test public void resourceRestApiTest() throws Exception { var testingClusterClient = new TestingClusterClient<String>(configuration);