This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 305cccf6 [FLINK-33027] Allow users to override parallelism of excluded 
vertices
305cccf6 is described below

commit 305cccf6d4cb9d4261a3f55ef71779aeaef26e14
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Mon Sep 4 14:43:26 2023 +0200

    [FLINK-33027] Allow users to override parallelism of excluded vertices
---
 .../operator/autoscaler/JobAutoScalerImpl.java     | 42 ++++++++++++-
 .../operator/autoscaler/JobAutoScalerImplTest.java | 73 ++++++++++++++++++++++
 .../AbstractFlinkResourceReconciler.java           | 35 +----------
 .../reconciler/deployment/JobAutoScaler.java       |  6 +-
 .../deployment/NoopJobAutoscalerFactory.java       |  6 +-
 .../deployment/ApplicationReconcilerTest.java      | 71 ++++-----------------
 6 files changed, 128 insertions(+), 105 deletions(-)

diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
index 0b634d32..356f0b51 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
@@ -19,8 +19,11 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import 
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
+import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
 import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
@@ -32,6 +35,7 @@ import 
io.javaoperatorsdk.operator.processing.event.ResourceID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -81,8 +85,8 @@ public class JobAutoScalerImpl implements JobAutoScaler {
         infoManager.removeInfoFromCache(cr);
     }
 
-    @Override
-    public Map<String, String> getParallelismOverrides(FlinkResourceContext<?> 
ctx) {
+    @VisibleForTesting
+    protected Map<String, String> 
getParallelismOverrides(FlinkResourceContext<?> ctx) {
         var conf = ctx.getObserveConfig();
         try {
             var infoOpt = infoManager.getInfo(ctx.getResource(), 
ctx.getKubernetesClient());
@@ -102,6 +106,40 @@ public class JobAutoScalerImpl implements JobAutoScaler {
         return Map.of();
     }
 
+    /**
+     * If there are any parallelism overrides by the {@link JobAutoScaler} 
apply them to the spec.
+     *
+     * @param ctx Resource context
+     */
+    @Override
+    public void applyParallelismOverrides(FlinkResourceContext<?> ctx) {
+        var overrides = getParallelismOverrides(ctx);
+        if (overrides.isEmpty()) {
+            return;
+        }
+
+        LOG.debug("Applying parallelism overrides: {}", overrides);
+
+        var spec = ctx.getResource().getSpec();
+        var conf = ctx.getDeployConfig(spec);
+        var userOverrides = new 
HashMap<>(conf.get(PipelineOptions.PARALLELISM_OVERRIDES));
+        var exclusions = conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
+
+        overrides.forEach(
+                (k, v) -> {
+                    // Respect user override for excluded vertices
+                    if (exclusions.contains(k)) {
+                        userOverrides.putIfAbsent(k, v);
+                    } else {
+                        userOverrides.put(k, v);
+                    }
+                });
+        spec.getFlinkConfiguration()
+                .put(
+                        PipelineOptions.PARALLELISM_OVERRIDES.key(),
+                        ConfigurationUtils.convertValue(userOverrides, 
String.class));
+    }
+
     @Override
     public boolean scale(FlinkResourceContext<?> ctx) {
 
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
index 030093e8..2b27297e 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
@@ -20,9 +20,11 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.kubernetes.operator.OperatorTestBase;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
@@ -48,6 +50,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.time.Instant;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
@@ -196,4 +199,74 @@ public class JobAutoScalerImplTest extends 
OperatorTestBase {
         ctx = getResourceContext(app);
         assertEquals(Map.of(), autoscaler.getParallelismOverrides(ctx));
     }
+
+    @Test
+    public void testApplyAutoscalerParallelism() {
+        var overrides = new HashMap<String, String>();
+        var autoscaler =
+                new JobAutoScalerImpl(null, null, null, eventRecorder) {
+                    public Map<String, String> getParallelismOverrides(
+                            FlinkResourceContext<?> ctx) {
+                        return new HashMap<>(overrides);
+                    }
+                };
+
+        var deployment = TestUtils.buildApplicationCluster();
+        var specClone = ReconciliationUtils.clone(deployment.getSpec());
+
+        // Verify no spec change if overrides are empty
+        autoscaler.applyParallelismOverrides(getResourceContext(deployment));
+        assertEquals(specClone, deployment.getSpec());
+
+        // Make sure overrides are applied to the spec
+        var v1 = new JobVertexID();
+        overrides.put(v1.toHexString(), "2");
+
+        // Verify no upgrades if overrides are empty
+        autoscaler.applyParallelismOverrides(getResourceContext(deployment));
+        assertEquals(
+                Map.of(v1.toHexString(), "2"),
+                getResourceContext(deployment)
+                        .getDeployConfig(deployment.getSpec())
+                        .get(PipelineOptions.PARALLELISM_OVERRIDES));
+
+        // We set a user override for v1, it should be ignored and the 
autoscaler override should
+        // take precedence
+        specClone = ReconciliationUtils.clone(deployment.getSpec());
+        deployment
+                .getSpec()
+                .getFlinkConfiguration()
+                .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v1 + ":1");
+        autoscaler.applyParallelismOverrides(getResourceContext(deployment));
+        assertEquals(specClone, deployment.getSpec());
+
+        // Define partly overlapping overrides, user overrides for new 
vertices should be applied
+        var v2 = new JobVertexID();
+        deployment
+                .getSpec()
+                .getFlinkConfiguration()
+                .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v1 + ":1," + 
v2 + ":4");
+        autoscaler.applyParallelismOverrides(getResourceContext(deployment));
+        assertEquals(
+                Map.of(v1.toString(), "2", v2.toString(), "4"),
+                getResourceContext(deployment)
+                        .getDeployConfig(deployment.getSpec())
+                        .get(PipelineOptions.PARALLELISM_OVERRIDES));
+
+        // Make sure user overrides apply to excluded vertices
+        deployment
+                .getSpec()
+                .getFlinkConfiguration()
+                .put(AutoScalerOptions.VERTEX_EXCLUDE_IDS.key(), 
v1.toString());
+        deployment
+                .getSpec()
+                .getFlinkConfiguration()
+                .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v1 + ":1," + 
v2 + ":4");
+        autoscaler.applyParallelismOverrides(getResourceContext(deployment));
+        assertEquals(
+                Map.of(v1.toString(), "1", v2.toString(), "4"),
+                getResourceContext(deployment)
+                        .getDeployConfig(deployment.getSpec())
+                        .get(PipelineOptions.PARALLELISM_OVERRIDES));
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 0967f60a..b82932f5 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -19,8 +19,6 @@ package 
org.apache.flink.kubernetes.operator.reconciler.deployment;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ConfigurationUtils;
-import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -56,7 +54,6 @@ import org.slf4j.LoggerFactory;
 import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -135,8 +132,7 @@ public abstract class AbstractFlinkResourceReconciler<
         SPEC lastReconciledSpec =
                 
cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
         SPEC currentDeploySpec = cr.getSpec();
-        applyAutoscalerParallelismOverrides(
-                resourceScaler.getParallelismOverrides(ctx), 
currentDeploySpec);
+        resourceScaler.applyParallelismOverrides(ctx);
 
         var specDiff =
                 new ReflectiveDiffBuilder<>(
@@ -327,35 +323,6 @@ public abstract class AbstractFlinkResourceReconciler<
         return false;
     }
 
-    /**
-     * If there are any parallelism overrides by the {@link JobAutoScaler} 
apply them to the spec.
-     *
-     * @param autoscalerOverrides Parallelism overrides initiated by the 
autoscaler
-     * @param spec Current user spec
-     */
-    private void applyAutoscalerParallelismOverrides(
-            Map<String, String> autoscalerOverrides, SPEC spec) {
-
-        if (autoscalerOverrides.isEmpty()) {
-            return;
-        }
-
-        LOG.debug("Applying autoscaler parallelism overrides: {}", 
autoscalerOverrides);
-
-        var configMap = spec.getFlinkConfiguration();
-        var userOverridesStr =
-                
configMap.getOrDefault(PipelineOptions.PARALLELISM_OVERRIDES.key(), "");
-        var userOverrides =
-                new HashMap<>(
-                        ConfigurationUtils.<Map<String, String>>convertValue(
-                                userOverridesStr, Map.class));
-
-        autoscalerOverrides.forEach(userOverrides::put);
-        configMap.put(
-                PipelineOptions.PARALLELISM_OVERRIDES.key(),
-                ConfigurationUtils.convertValue(userOverrides, String.class));
-    }
-
     /**
      * Scale the cluster in-place if possible, either through reactive scaling 
or declarative
      * resources.
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScaler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScaler.java
index 7d8780ff..80e1e185 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScaler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScaler.java
@@ -19,8 +19,6 @@ package 
org.apache.flink.kubernetes.operator.reconciler.deployment;
 
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 
-import java.util.Map;
-
 /** Per-job Autoscaler instance. */
 public interface JobAutoScaler {
 
@@ -30,6 +28,6 @@ public interface JobAutoScaler {
     /** Called when the custom resource is deleted. */
     void cleanup(FlinkResourceContext<?> ctx);
 
-    /** Get the current parallelism overrides for the job. */
-    Map<String, String> getParallelismOverrides(FlinkResourceContext<?> ctx);
+    /** Apply current parallelism overrides. */
+    void applyParallelismOverrides(FlinkResourceContext<?> ctx);
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/NoopJobAutoscalerFactory.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/NoopJobAutoscalerFactory.java
index f819123d..8cbd3f91 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/NoopJobAutoscalerFactory.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/NoopJobAutoscalerFactory.java
@@ -20,8 +20,6 @@ package 
org.apache.flink.kubernetes.operator.reconciler.deployment;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 
-import java.util.Map;
-
 /** An autoscaler implementation which does nothing. */
 public class NoopJobAutoscalerFactory implements JobAutoScalerFactory, 
JobAutoScaler {
 
@@ -39,7 +37,5 @@ public class NoopJobAutoscalerFactory implements 
JobAutoScalerFactory, JobAutoSc
     public void cleanup(FlinkResourceContext<?> ctx) {}
 
     @Override
-    public Map<String, String> getParallelismOverrides(FlinkResourceContext<?> 
ctx) {
-        return Map.of();
-    }
+    public void applyParallelismOverrides(FlinkResourceContext<?> ctx) {}
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index d903cacc..1f03ee70 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkResourceContextFactory;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
@@ -86,13 +87,13 @@ import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.util.Calendar;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -877,19 +878,14 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
         var ctxFactory =
                 new TestingFlinkResourceContextFactory(
                         configManager, operatorMetricGroup, flinkService, 
eventRecorder);
-        var overrides = new HashMap<String, String>();
+
+        var overrideFunction = new 
AtomicReference<Consumer<AbstractFlinkSpec>>(s -> {});
         JobAutoScalerFactory autoscalerFactory =
                 (r) ->
                         new NoopJobAutoscalerFactory() {
                             @Override
-                            public Map<String, String> getParallelismOverrides(
-                                    FlinkResourceContext<?> ctx) {
-                                return new HashMap<>(overrides);
-                            }
-
-                            @Override
-                            public boolean scale(FlinkResourceContext<?> ctx) {
-                                return true;
+                            public void 
applyParallelismOverrides(FlinkResourceContext<?> ctx) {
+                                
overrideFunction.get().accept(ctx.getResource().getSpec());
                             }
                         };
 
@@ -906,69 +902,24 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
                 ReconciliationState.DEPLOYED,
                 deployment.getStatus().getReconciliationStatus().getState());
         assertEquals("RUNNING", 
deployment.getStatus().getJobStatus().getState());
-        assertTrue(deployment.getStatus().isImmediateReconciliationNeeded());
 
-        // Test when there are only overrides by the autoscaler
+        // Test overrides are applied correctly
         var v1 = new JobVertexID();
-        overrides.put(v1.toHexString(), "2");
+        overrideFunction.set(
+                s ->
+                        s.getFlinkConfiguration()
+                                
.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v1 + ":2"));
 
         appReconciler.reconcile(ctxFactory.getResourceContext(deployment, 
context));
         assertEquals(
                 ReconciliationState.UPGRADING,
                 deployment.getStatus().getReconciliationStatus().getState());
-        appReconciler.reconcile(ctxFactory.getResourceContext(deployment, 
context));
-        assertEquals(
-                ReconciliationState.DEPLOYED,
-                deployment.getStatus().getReconciliationStatus().getState());
-        verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
-
-        assertEquals(
-                Map.of(v1.toHexString(), "2"),
-                ctxFactory
-                        .getResourceContext(deployment, context)
-                        .getObserveConfig()
-                        .get(PipelineOptions.PARALLELISM_OVERRIDES));
-
-        // Test when there are also user overrides, autoscaler should take 
precedence
-
-        // This should be ignored
-        deployment
-                .getSpec()
-                .getFlinkConfiguration()
-                .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v1 + ":1");
-        appReconciler.reconcile(ctxFactory.getResourceContext(deployment, 
context));
-        assertEquals(
-                ReconciliationState.DEPLOYED,
-                deployment.getStatus().getReconciliationStatus().getState());
         assertEquals(
                 Map.of(v1.toHexString(), "2"),
                 ctxFactory
                         .getResourceContext(deployment, context)
                         .getObserveConfig()
                         .get(PipelineOptions.PARALLELISM_OVERRIDES));
-
-        // Define partly overlapping overrides
-        var v2 = new JobVertexID();
-        deployment
-                .getSpec()
-                .getFlinkConfiguration()
-                .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v1 + ":1," + 
v2 + ":4");
-        appReconciler.reconcile(ctxFactory.getResourceContext(deployment, 
context));
-        assertEquals(
-                ReconciliationState.UPGRADING,
-                deployment.getStatus().getReconciliationStatus().getState());
-        appReconciler.reconcile(ctxFactory.getResourceContext(deployment, 
context));
-        assertEquals(
-                ReconciliationState.DEPLOYED,
-                deployment.getStatus().getReconciliationStatus().getState());
-        verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
-
-        assertEquals(
-                Map.of(v1.toString(), "2", v2.toString(), "4"),
-                ctxFactory
-                        .getResourceContext(deployment, context)
-                        .getObserveConfig()
-                        .get(PipelineOptions.PARALLELISM_OVERRIDES));
     }
 
     @ParameterizedTest

Reply via email to