This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 305cccf6 [FLINK-33027] Allow users to override parallelism of excluded vertices 305cccf6 is described below commit 305cccf6d4cb9d4261a3f55ef71779aeaef26e14 Author: Gyula Fora <g_f...@apple.com> AuthorDate: Mon Sep 4 14:43:26 2023 +0200 [FLINK-33027] Allow users to override parallelism of excluded vertices --- .../operator/autoscaler/JobAutoScalerImpl.java | 42 ++++++++++++- .../operator/autoscaler/JobAutoScalerImplTest.java | 73 ++++++++++++++++++++++ .../AbstractFlinkResourceReconciler.java | 35 +---------- .../reconciler/deployment/JobAutoScaler.java | 6 +- .../deployment/NoopJobAutoscalerFactory.java | 6 +- .../deployment/ApplicationReconcilerTest.java | 71 ++++----------------- 6 files changed, 128 insertions(+), 105 deletions(-) diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java index 0b634d32..356f0b51 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java @@ -19,8 +19,11 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState; +import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; @@ -32,6 +35,7 @@ import io.javaoperatorsdk.operator.processing.event.ResourceID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -81,8 +85,8 @@ public class JobAutoScalerImpl implements JobAutoScaler { infoManager.removeInfoFromCache(cr); } - @Override - public Map<String, String> getParallelismOverrides(FlinkResourceContext<?> ctx) { + @VisibleForTesting + protected Map<String, String> getParallelismOverrides(FlinkResourceContext<?> ctx) { var conf = ctx.getObserveConfig(); try { var infoOpt = infoManager.getInfo(ctx.getResource(), ctx.getKubernetesClient()); @@ -102,6 +106,40 @@ public class JobAutoScalerImpl implements JobAutoScaler { return Map.of(); } + /** + * If there are any parallelism overrides by the {@link JobAutoScaler} apply them to the spec. + * + * @param ctx Resource context + */ + @Override + public void applyParallelismOverrides(FlinkResourceContext<?> ctx) { + var overrides = getParallelismOverrides(ctx); + if (overrides.isEmpty()) { + return; + } + + LOG.debug("Applying parallelism overrides: {}", overrides); + + var spec = ctx.getResource().getSpec(); + var conf = ctx.getDeployConfig(spec); + var userOverrides = new HashMap<>(conf.get(PipelineOptions.PARALLELISM_OVERRIDES)); + var exclusions = conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS); + + overrides.forEach( + (k, v) -> { + // Respect user override for excluded vertices + if (exclusions.contains(k)) { + userOverrides.putIfAbsent(k, v); + } else { + userOverrides.put(k, v); + } + }); + spec.getFlinkConfiguration() + .put( + PipelineOptions.PARALLELISM_OVERRIDES.key(), + ConfigurationUtils.convertValue(userOverrides, String.class)); + } + @Override public boolean scale(FlinkResourceContext<?> ctx) { diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java index 030093e8..2b27297e 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java @@ -20,9 +20,11 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.kubernetes.operator.OperatorTestBase; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; @@ -48,6 +50,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.time.Instant; +import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -196,4 +199,74 @@ public class JobAutoScalerImplTest extends OperatorTestBase { ctx = getResourceContext(app); assertEquals(Map.of(), autoscaler.getParallelismOverrides(ctx)); } + + @Test + public void testApplyAutoscalerParallelism() { + var overrides = new HashMap<String, String>(); + var autoscaler = + new JobAutoScalerImpl(null, null, null, eventRecorder) { + public Map<String, String> getParallelismOverrides( + FlinkResourceContext<?> ctx) { + return new HashMap<>(overrides); + } + }; + + var deployment = TestUtils.buildApplicationCluster(); + var specClone = ReconciliationUtils.clone(deployment.getSpec()); + + // Verify no spec change if overrides are empty + autoscaler.applyParallelismOverrides(getResourceContext(deployment)); + assertEquals(specClone, deployment.getSpec()); + + // Make sure overrides are applied to the spec + var v1 = new JobVertexID(); + overrides.put(v1.toHexString(), "2"); + + // Verify no upgrades if overrides are empty + autoscaler.applyParallelismOverrides(getResourceContext(deployment)); + assertEquals( + Map.of(v1.toHexString(), "2"), + getResourceContext(deployment) + .getDeployConfig(deployment.getSpec()) + .get(PipelineOptions.PARALLELISM_OVERRIDES)); + + // We set a user override for v1, it should be ignored and the autoscaler override should + // take precedence + specClone = ReconciliationUtils.clone(deployment.getSpec()); + deployment + .getSpec() + .getFlinkConfiguration() + .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v1 + ":1"); + autoscaler.applyParallelismOverrides(getResourceContext(deployment)); + assertEquals(specClone, deployment.getSpec()); + + // Define partly overlapping overrides, user overrides for new vertices should be applied + var v2 = new JobVertexID(); + deployment + .getSpec() + .getFlinkConfiguration() + .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v1 + ":1," + v2 + ":4"); + autoscaler.applyParallelismOverrides(getResourceContext(deployment)); + assertEquals( + Map.of(v1.toString(), "2", v2.toString(), "4"), + getResourceContext(deployment) + .getDeployConfig(deployment.getSpec()) + .get(PipelineOptions.PARALLELISM_OVERRIDES)); + + // Make sure user overrides apply to excluded vertices + deployment + .getSpec() + .getFlinkConfiguration() + .put(AutoScalerOptions.VERTEX_EXCLUDE_IDS.key(), v1.toString()); + deployment + .getSpec() + .getFlinkConfiguration() + .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v1 + ":1," + v2 + ":4"); + autoscaler.applyParallelismOverrides(getResourceContext(deployment)); + assertEquals( + Map.of(v1.toString(), "1", v2.toString(), "4"), + getResourceContext(deployment) + .getDeployConfig(deployment.getSpec()) + .get(PipelineOptions.PARALLELISM_OVERRIDES)); + } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java index 0967f60a..b82932f5 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java @@ -19,8 +19,6 @@ package org.apache.flink.kubernetes.operator.reconciler.deployment; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ConfigurationUtils; -import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; @@ -56,7 +54,6 @@ import org.slf4j.LoggerFactory; import java.time.Clock; import java.time.Duration; import java.time.Instant; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -135,8 +132,7 @@ public abstract class AbstractFlinkResourceReconciler< SPEC lastReconciledSpec = cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec(); SPEC currentDeploySpec = cr.getSpec(); - applyAutoscalerParallelismOverrides( - resourceScaler.getParallelismOverrides(ctx), currentDeploySpec); + resourceScaler.applyParallelismOverrides(ctx); var specDiff = new ReflectiveDiffBuilder<>( @@ -327,35 +323,6 @@ public abstract class AbstractFlinkResourceReconciler< return false; } - /** - * If there are any parallelism overrides by the {@link JobAutoScaler} apply them to the spec. - * - * @param autoscalerOverrides Parallelism overrides initiated by the autoscaler - * @param spec Current user spec - */ - private void applyAutoscalerParallelismOverrides( - Map<String, String> autoscalerOverrides, SPEC spec) { - - if (autoscalerOverrides.isEmpty()) { - return; - } - - LOG.debug("Applying autoscaler parallelism overrides: {}", autoscalerOverrides); - - var configMap = spec.getFlinkConfiguration(); - var userOverridesStr = - configMap.getOrDefault(PipelineOptions.PARALLELISM_OVERRIDES.key(), ""); - var userOverrides = - new HashMap<>( - ConfigurationUtils.<Map<String, String>>convertValue( - userOverridesStr, Map.class)); - - autoscalerOverrides.forEach(userOverrides::put); - configMap.put( - PipelineOptions.PARALLELISM_OVERRIDES.key(), - ConfigurationUtils.convertValue(userOverrides, String.class)); - } - /** * Scale the cluster in-place if possible, either through reactive scaling or declarative * resources. diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScaler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScaler.java index 7d8780ff..80e1e185 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScaler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScaler.java @@ -19,8 +19,6 @@ package org.apache.flink.kubernetes.operator.reconciler.deployment; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; -import java.util.Map; - /** Per-job Autoscaler instance. */ public interface JobAutoScaler { @@ -30,6 +28,6 @@ public interface JobAutoScaler { /** Called when the custom resource is deleted. */ void cleanup(FlinkResourceContext<?> ctx); - /** Get the current parallelism overrides for the job. */ - Map<String, String> getParallelismOverrides(FlinkResourceContext<?> ctx); + /** Apply current parallelism overrides. */ + void applyParallelismOverrides(FlinkResourceContext<?> ctx); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/NoopJobAutoscalerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/NoopJobAutoscalerFactory.java index f819123d..8cbd3f91 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/NoopJobAutoscalerFactory.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/NoopJobAutoscalerFactory.java @@ -20,8 +20,6 @@ package org.apache.flink.kubernetes.operator.reconciler.deployment; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.utils.EventRecorder; -import java.util.Map; - /** An autoscaler implementation which does nothing. */ public class NoopJobAutoscalerFactory implements JobAutoScalerFactory, JobAutoScaler { @@ -39,7 +37,5 @@ public class NoopJobAutoscalerFactory implements JobAutoScalerFactory, JobAutoSc public void cleanup(FlinkResourceContext<?> ctx) {} @Override - public Map<String, String> getParallelismOverrides(FlinkResourceContext<?> ctx) { - return Map.of(); - } + public void applyParallelismOverrides(FlinkResourceContext<?> ctx) {} } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java index d903cacc..1f03ee70 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java @@ -34,6 +34,7 @@ import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.TestingFlinkResourceContextFactory; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.api.spec.JobSpec; @@ -86,13 +87,13 @@ import java.time.Duration; import java.time.Instant; import java.time.ZoneId; import java.util.Calendar; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -877,19 +878,14 @@ public class ApplicationReconcilerTest extends OperatorTestBase { var ctxFactory = new TestingFlinkResourceContextFactory( configManager, operatorMetricGroup, flinkService, eventRecorder); - var overrides = new HashMap<String, String>(); + + var overrideFunction = new AtomicReference<Consumer<AbstractFlinkSpec>>(s -> {}); JobAutoScalerFactory autoscalerFactory = (r) -> new NoopJobAutoscalerFactory() { @Override - public Map<String, String> getParallelismOverrides( - FlinkResourceContext<?> ctx) { - return new HashMap<>(overrides); - } - - @Override - public boolean scale(FlinkResourceContext<?> ctx) { - return true; + public void applyParallelismOverrides(FlinkResourceContext<?> ctx) { + overrideFunction.get().accept(ctx.getResource().getSpec()); } }; @@ -906,69 +902,24 @@ public class ApplicationReconcilerTest extends OperatorTestBase { ReconciliationState.DEPLOYED, deployment.getStatus().getReconciliationStatus().getState()); assertEquals("RUNNING", deployment.getStatus().getJobStatus().getState()); - assertTrue(deployment.getStatus().isImmediateReconciliationNeeded()); - // Test when there are only overrides by the autoscaler + // Test overrides are applied correctly var v1 = new JobVertexID(); - overrides.put(v1.toHexString(), "2"); + overrideFunction.set( + s -> + s.getFlinkConfiguration() + .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v1 + ":2")); appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context)); assertEquals( ReconciliationState.UPGRADING, deployment.getStatus().getReconciliationStatus().getState()); - appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context)); - assertEquals( - ReconciliationState.DEPLOYED, - deployment.getStatus().getReconciliationStatus().getState()); - verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs()); - - assertEquals( - Map.of(v1.toHexString(), "2"), - ctxFactory - .getResourceContext(deployment, context) - .getObserveConfig() - .get(PipelineOptions.PARALLELISM_OVERRIDES)); - - // Test when there are also user overrides, autoscaler should take precedence - - // This should be ignored - deployment - .getSpec() - .getFlinkConfiguration() - .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v1 + ":1"); - appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context)); - assertEquals( - ReconciliationState.DEPLOYED, - deployment.getStatus().getReconciliationStatus().getState()); assertEquals( Map.of(v1.toHexString(), "2"), ctxFactory .getResourceContext(deployment, context) .getObserveConfig() .get(PipelineOptions.PARALLELISM_OVERRIDES)); - - // Define partly overlapping overrides - var v2 = new JobVertexID(); - deployment - .getSpec() - .getFlinkConfiguration() - .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v1 + ":1," + v2 + ":4"); - appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context)); - assertEquals( - ReconciliationState.UPGRADING, - deployment.getStatus().getReconciliationStatus().getState()); - appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context)); - assertEquals( - ReconciliationState.DEPLOYED, - deployment.getStatus().getReconciliationStatus().getState()); - verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs()); - - assertEquals( - Map.of(v1.toString(), "2", v2.toString(), "4"), - ctxFactory - .getResourceContext(deployment, context) - .getObserveConfig() - .get(PipelineOptions.PARALLELISM_OVERRIDES)); } @ParameterizedTest