This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new a3ef0485 [FLINK-34947] Only scale down JM in Foreground deletion 
propagation and reduce timeout
a3ef0485 is described below

commit a3ef0485c4e871ca5572f32379e69e0f8a49f627
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Wed Mar 27 08:08:41 2024 +0100

    [FLINK-34947] Only scale down JM in Foreground deletion propagation and 
reduce timeout
---
 .../operator/service/NativeFlinkService.java       |  65 +++++++------
 .../operator/service/NativeFlinkServiceTest.java   | 104 ++++++++++++---------
 2 files changed, 98 insertions(+), 71 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
index 42163124..d112bfbc 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -60,6 +60,7 @@ import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.dsl.EditReplacePatchable;
 import io.fabric8.kubernetes.client.dsl.base.PatchContext;
 import io.fabric8.kubernetes.client.dsl.base.PatchType;
+import org.apache.commons.lang3.ObjectUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,6 +81,7 @@ public class NativeFlinkService extends AbstractFlinkService {
     private static final Logger LOG = 
LoggerFactory.getLogger(NativeFlinkService.class);
     private static final Deployment SCALE_TO_ZERO =
             new 
DeploymentBuilder().editOrNewSpec().withReplicas(0).endSpec().build();
+    private static final Duration JM_SHUTDOWN_MAX_WAIT = Duration.ofMinutes(1);
     private final EventRecorder eventRecorder;
 
     public NativeFlinkService(
@@ -164,12 +166,15 @@ public class NativeFlinkService extends 
AbstractFlinkService {
                         .inNamespace(namespace)
                         
.withName(KubernetesUtils.getDeploymentName(clusterId));
 
-        var remainingTimeout =
-                scaleJmToZeroBlocking(
-                        jmDeployment,
-                        namespace,
-                        clusterId,
-                        operatorConfig.getFlinkShutdownClusterTimeout());
+        var remainingTimeout = operatorConfig.getFlinkShutdownClusterTimeout();
+
+        // We shut down the JobManager first in the (default) Foreground 
propagation case to have a
+        // cleaner exit
+        if (deletionPropagation == DeletionPropagation.FOREGROUND) {
+            remainingTimeout =
+                    shutdownJobManagersBlocking(
+                            jmDeployment, namespace, clusterId, 
remainingTimeout);
+        }
         deleteDeploymentBlocking("JobManager", jmDeployment, 
deletionPropagation, remainingTimeout);
     }
 
@@ -306,34 +311,42 @@ public class NativeFlinkService extends 
AbstractFlinkService {
     }
 
     /**
-     * Scale JM deployment to zero to gracefully stop all JM instances before 
any TMs are stopped.
-     * This avoids race conditions between JM shutdown and TM shutdown / 
failure handling.
+     * Shut down JobManagers gracefully by scaling JM deployment to zero. This 
avoids race
+     * conditions between JM shutdown and TM shutdown / failure handling.
      *
      * @param jmDeployment
      * @param namespace
      * @param clusterId
-     * @param timeout
+     * @param remainingTimeout
      * @return Remaining timeout after the operation.
      */
-    private Duration scaleJmToZeroBlocking(
+    private Duration shutdownJobManagersBlocking(
             EditReplacePatchable<Deployment> jmDeployment,
             String namespace,
             String clusterId,
-            Duration timeout) {
-        return deleteBlocking(
-                "Scaling JobManager Deployment to zero",
-                () -> {
-                    try {
-                        
jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO);
-                    } catch (Exception ignore) {
-                        // Ignore all errors here as this is an optional step
-                        return null;
-                    }
-                    return kubernetesClient
-                            .pods()
-                            .inNamespace(namespace)
-                            
.withLabels(KubernetesUtils.getJobManagerSelectors(clusterId));
-                },
-                timeout);
+            Duration remainingTimeout) {
+
+        // We use only half of the shutdown timeout but at most one minute as 
the main point
+        // here is to initiate JM shutdown before the TMs
+        var jmShutdownTimeout =
+                ObjectUtils.min(JM_SHUTDOWN_MAX_WAIT, 
remainingTimeout.dividedBy(2));
+        var remaining =
+                deleteBlocking(
+                        "Scaling JobManager Deployment to zero",
+                        () -> {
+                            try {
+                                jmDeployment.patch(
+                                        PatchContext.of(PatchType.JSON_MERGE), 
SCALE_TO_ZERO);
+                            } catch (Exception ignore) {
+                                // Ignore all errors here as this is an 
optional step
+                                return null;
+                            }
+                            return kubernetesClient
+                                    .pods()
+                                    .inNamespace(namespace)
+                                    
.withLabels(KubernetesUtils.getJobManagerSelectors(clusterId));
+                        },
+                        jmShutdownTimeout);
+        return remainingTimeout.minus(jmShutdownTimeout).plus(remaining);
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index c32957b8..e4e0ad24 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -38,7 +38,7 @@ import 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentContext;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.EventCollector;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
 import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
@@ -47,22 +47,22 @@ import org.apache.flink.runtime.rest.messages.JobPlanInfo;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
 import 
org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
-import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
 import org.apache.flink.util.concurrent.Executors;
 
 import io.fabric8.kubernetes.api.model.DeletionPropagation;
-import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.PodBuilder;
 import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
 import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.dsl.Resource;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.time.Duration;
+import java.time.Instant;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -76,7 +76,6 @@ import static 
org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLI
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -107,25 +106,19 @@ public class NativeFlinkServiceTest {
         executorService = Executors.newDirectExecutorService();
     }
 
-    @Test
-    public void testDeleteClusterInternal() {
-
+    @ParameterizedTest
+    @EnumSource(DeletionPropagation.class)
+    public void testDeleteClusterInternal(DeletionPropagation propagation) {
+        var timeout = Duration.ofSeconds(4);
+        configuration.set(
+                
KubernetesOperatorConfigOptions.OPERATOR_RESOURCE_CLEANUP_TIMEOUT, timeout);
         var flinkService =
                 new NativeFlinkService(
-                        client, null, executorService, operatorConfig, 
eventRecorder) {
-
-                    @Override
-                    protected Duration deleteDeploymentBlocking(
-                            String name,
-                            Resource<Deployment> deployment,
-                            DeletionPropagation propagation,
-                            Duration timeout) {
-                        // Ensure deployment is scaled down before deletion
-                        assertEquals(0, 
deployment.get().getSpec().getReplicas());
-                        return super.deleteDeploymentBlocking(
-                                name, deployment, propagation, timeout);
-                    }
-                };
+                        client,
+                        null,
+                        executorService,
+                        
FlinkOperatorConfiguration.fromConfiguration(configuration),
+                        eventRecorder);
 
         var deployment = TestUtils.buildApplicationCluster();
         ReconciliationUtils.updateStatusForDeployedSpec(deployment, new 
Configuration());
@@ -141,18 +134,55 @@ public class NativeFlinkServiceTest {
                         .endSpec()
                         .build();
         client.resource(dep).create();
-        assertNotNull(
-                client.apps()
-                        .deployments()
-                        .inNamespace(TestUtils.TEST_NAMESPACE)
-                        .withName(TestUtils.TEST_DEPLOYMENT_NAME)
-                        .get());
 
+        var patched = new AtomicBoolean(false);
+        mockServer
+                .expect()
+                .patch()
+                .withPath(
+                        String.format(
+                                "/apis/apps/v1/namespaces/%s/deployments/%s",
+                                TestUtils.TEST_NAMESPACE, 
TestUtils.TEST_DEPLOYMENT_NAME))
+                .andReply(
+                        200,
+                        req -> {
+                            patched.set(true);
+                            return deployment;
+                        })
+                .always();
+
+        // We create the JM pod explicitly here, this will block the JM scale 
down action
+        // indefinitely and we use this to verify the correct timeout 
enforcement
+        var jmPod =
+                new PodBuilder()
+                        .withNewMetadata()
+                        .withName("JM")
+                        .withLabels(
+                                KubernetesUtils.getJobManagerSelectors(
+                                        TestUtils.TEST_DEPLOYMENT_NAME))
+                        .withNamespace(TestUtils.TEST_NAMESPACE)
+                        .endMetadata()
+                        .build();
+        client.resource(jmPod).create();
+
+        var start = Instant.now();
         flinkService.deleteClusterInternal(
                 deployment.getMetadata().getNamespace(),
                 deployment.getMetadata().getName(),
                 configManager.getObserveConfig(deployment),
-                DeletionPropagation.FOREGROUND);
+                propagation);
+        var measured = Duration.between(start, Instant.now());
+
+        // Do not scale JM deployment during orphan deletion
+        if (propagation == DeletionPropagation.FOREGROUND) {
+            assertTrue(patched.get());
+            // We make sure that we dont use up the entire timeout for jm 
deletion
+            assertTrue(timeout.minus(measured).toSeconds() > 0);
+            // Validate that we actually waited 2 seconds
+            assertTrue(measured.toSeconds() > 1);
+        } else {
+            assertFalse(patched.get());
+        }
 
         assertNull(
                 client.apps()
@@ -476,22 +506,6 @@ public class NativeFlinkServiceTest {
                 scaled);
     }
 
-    private JobDetailsInfo.JobVertexDetailsInfo jobVertexDetailsInfo(
-            JobVertexID jvi, int parallelism) {
-        var ioMetricsInfo = new IOMetricsInfo(0, false, 0, false, 0, false, 0, 
false, 0L, 0L, 0.);
-        return new JobDetailsInfo.JobVertexDetailsInfo(
-                jvi,
-                "",
-                900,
-                parallelism,
-                ExecutionState.RUNNING,
-                0,
-                0,
-                0,
-                Map.of(),
-                ioMetricsInfo);
-    }
-
     @Test
     public void resourceRestApiTest() throws Exception {
         var testingClusterClient = new 
TestingClusterClient<String>(configuration);

Reply via email to