This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new cbcc6b67 [FLINK-33526] Autoscaler config improvement + cleanup cbcc6b67 is described below commit cbcc6b67c98ddfad8bd6141edfd1a6e8c2ff00f5 Author: Gyula Fora <g_f...@apple.com> AuthorDate: Sun Nov 12 22:12:53 2023 +0100 [FLINK-33526] Autoscaler config improvement + cleanup --- .../generated/auto_scaler_configuration.html | 8 +++---- .../org/apache/flink/autoscaler/JobAutoScaler.java | 18 ++++++++++++---- .../apache/flink/autoscaler/JobAutoScalerImpl.java | 1 + .../flink/autoscaler/config/AutoScalerOptions.java | 10 ++++----- .../MetricsCollectionAndEvaluationTest.java | 2 +- .../flink/kubernetes/operator/FlinkOperator.java | 3 +-- .../AbstractFlinkResourceReconciler.java | 25 +++++++--------------- .../reconciler/deployment/ReconcilerFactory.java | 4 ---- .../TestingFlinkDeploymentController.java | 1 - 9 files changed, 34 insertions(+), 38 deletions(-) diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html index 0b5f33a8..c3a1d798 100644 --- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html +++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html @@ -16,7 +16,7 @@ </tr> <tr> <td><h5>job.autoscaler.catch-up.duration</h5></td> - <td style="word-wrap: break-word;">15 min</td> + <td style="word-wrap: break-word;">30 min</td> <td>Duration</td> <td>The target duration for fully processing any backlog after a scaling operation. Set to 0 to disable backlog based scaling.</td> </tr> @@ -52,7 +52,7 @@ </tr> <tr> <td><h5>job.autoscaler.metrics.window</h5></td> - <td style="word-wrap: break-word;">10 min</td> + <td style="word-wrap: break-word;">15 min</td> <td>Duration</td> <td>Scaling metrics aggregation window size.</td> </tr> @@ -76,7 +76,7 @@ </tr> <tr> <td><h5>job.autoscaler.restart.time</h5></td> - <td style="word-wrap: break-word;">3 min</td> + <td style="word-wrap: break-word;">5 min</td> <td>Duration</td> <td>Expected restart time to be used until the operator can determine it reliably from history.</td> </tr> @@ -136,7 +136,7 @@ </tr> <tr> <td><h5>job.autoscaler.target.utilization.boundary</h5></td> - <td style="word-wrap: break-word;">0.4</td> + <td style="word-wrap: break-word;">0.3</td> <td>Double</td> <td>Target vertex utilization boundary. Scaling won't be performed if the current processing rate is within [target_rate / (target_utilization - boundary), (target_rate / (target_utilization + boundary)]</td> </tr> diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java index ff2b7331..f31fac10 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java @@ -20,16 +20,26 @@ package org.apache.flink.autoscaler; import org.apache.flink.annotation.Internal; /** - * The generic Autoscaler. + * Flink Job AutoScaler. * * @param <KEY> The job key. + * @param <Context> Instance of {@link JobAutoScalerContext}. */ @Internal public interface JobAutoScaler<KEY, Context extends JobAutoScalerContext<KEY>> { - /** Called as part of the reconciliation loop. */ + /** + * Compute and apply new parallelism overrides for the provided job context. + * + * @param context Job context. + * @throws Exception + */ void scale(Context context) throws Exception; - /** Called when the job is deleted. */ - void cleanup(KEY key); + /** + * Called when the job is deleted. + * + * @param jobKey Job key. + */ + void cleanup(KEY jobKey); } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java index ca640371..69c646f5 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java @@ -91,6 +91,7 @@ public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>> } if (ctx.getJobStatus() != JobStatus.RUNNING) { + LOG.debug("Autoscaler is waiting for stable, running state"); lastEvaluatedMetrics.remove(ctx.getJobKey()); return; } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java index e8872a35..4ac06fb1 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java @@ -60,7 +60,7 @@ public class AutoScalerOptions { public static final ConfigOption<Duration> METRICS_WINDOW = autoScalerConfig("metrics.window") .durationType() - .defaultValue(Duration.ofMinutes(10)) + .defaultValue(Duration.ofMinutes(15)) .withDeprecatedKeys(deprecatedOperatorConfigKey("metrics.window")) .withDescription("Scaling metrics aggregation window size."); @@ -82,7 +82,7 @@ public class AutoScalerOptions { public static final ConfigOption<Double> TARGET_UTILIZATION_BOUNDARY = autoScalerConfig("target.utilization.boundary") .doubleType() - .defaultValue(0.4) + .defaultValue(0.3) .withDeprecatedKeys(deprecatedOperatorConfigKey("target.utilization.boundary")) .withDescription( "Target vertex utilization boundary. Scaling won't be performed if the current processing rate is within [target_rate / (target_utilization - boundary), (target_rate / (target_utilization + boundary)]"); @@ -129,7 +129,7 @@ public class AutoScalerOptions { public static final ConfigOption<Duration> CATCH_UP_DURATION = autoScalerConfig("catch-up.duration") .durationType() - .defaultValue(Duration.ofMinutes(15)) + .defaultValue(Duration.ofMinutes(30)) .withDeprecatedKeys(deprecatedOperatorConfigKey("catch-up.duration")) .withDescription( "The target duration for fully processing any backlog after a scaling operation. Set to 0 to disable backlog based scaling."); @@ -137,7 +137,7 @@ public class AutoScalerOptions { public static final ConfigOption<Duration> RESTART_TIME = autoScalerConfig("restart.time") .durationType() - .defaultValue(Duration.ofMinutes(3)) + .defaultValue(Duration.ofMinutes(5)) .withDeprecatedKeys(deprecatedOperatorConfigKey("restart.time")) .withDescription( "Expected restart time to be used until the operator can determine it reliably from history."); @@ -234,7 +234,7 @@ public class AutoScalerOptions { public static final ConfigOption<Duration> SCALING_EVENT_INTERVAL = autoScalerConfig("scaling.event.interval") .durationType() - .defaultValue(Duration.ofSeconds(1800)) + .defaultValue(Duration.ofMinutes(30)) .withDeprecatedKeys(deprecatedOperatorConfigKey("scaling.event.interval")) .withDescription("Time interval to resend the identical event"); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java index ef84c52b..d216ddfa 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java @@ -376,7 +376,7 @@ public class MetricsCollectionAndEvaluationTest { 5000., evaluation.get(source1).get(ScalingMetric.TRUE_PROCESSING_RATE).getCurrent()); assertEquals( - 1667., + 1250., evaluation.get(source1).get(ScalingMetric.SCALE_DOWN_RATE_THRESHOLD).getCurrent()); assertEquals( 500., diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index 3dbf4fee..0735bf33 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -166,8 +166,7 @@ public class FlinkOperator { MetricManager.createFlinkDeploymentMetricManager(baseConfig, metricGroup); var statusRecorder = StatusRecorder.create(client, metricManager, listeners); var autoscaler = AutoscalerFactory.create(client, eventRecorder); - var reconcilerFactory = - new ReconcilerFactory(configManager, eventRecorder, statusRecorder, autoscaler); + var reconcilerFactory = new ReconcilerFactory(eventRecorder, statusRecorder, autoscaler); var observerFactory = new FlinkDeploymentObserverFactory(eventRecorder); var canaryResourceManager = new CanaryResourceManager<FlinkDeployment>(configManager); HealthProbe.INSTANCE.registerCanaryResourceManager(canaryResourceManager); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java index dfe98a89..5bef0ba7 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java @@ -18,7 +18,6 @@ package org.apache.flink.kubernetes.operator.reconciler.deployment; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.JobStatus; import org.apache.flink.autoscaler.JobAutoScaler; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; @@ -138,7 +137,7 @@ public abstract class AbstractFlinkResourceReconciler< cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec(); SPEC currentDeploySpec = cr.getSpec(); - scaling(ctx); + applyAutoscaler(ctx); var reconciliationState = reconciliationStatus.getState(); var specDiff = @@ -181,21 +180,13 @@ public abstract class AbstractFlinkResourceReconciler< } } - private void scaling(FlinkResourceContext<CR> ctx) throws Exception { - KubernetesJobAutoScalerContext autoScalerContext = ctx.getJobAutoScalerContext(); - - if (autoscalerDisabled(ctx)) { - autoScalerContext.getConfiguration().set(AUTOSCALER_ENABLED, false); - } else if (autoScalerContext.getJobStatus() != JobStatus.RUNNING) { - LOG.info("Autoscaler is waiting for stable, running state"); - } - - autoscaler.scale(autoScalerContext); - } - - private boolean autoscalerDisabled(FlinkResourceContext<CR> ctx) { - return ctx.getResource().getSpec().getJob() == null - || !ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED); + private void applyAutoscaler(FlinkResourceContext<CR> ctx) throws Exception { + var autoScalerCtx = ctx.getJobAutoScalerContext(); + boolean autoscalerEnabled = + ctx.getResource().getSpec().getJob() != null + && ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED); + autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled); + autoscaler.scale(autoScalerCtx); } private void triggerSpecChangeEvent(CR cr, DiffResult<SPEC> specDiff, KubernetesClient client) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java index beba19dc..429caaa6 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java @@ -23,7 +23,6 @@ import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext; -import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.config.Mode; import org.apache.flink.kubernetes.operator.reconciler.Reconciler; import org.apache.flink.kubernetes.operator.utils.EventRecorder; @@ -37,7 +36,6 @@ import java.util.concurrent.ConcurrentHashMap; /** The factory to create reconciler based on app mode. */ public class ReconcilerFactory { - private final FlinkConfigManager configManager; private final EventRecorder eventRecorder; private final StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> deploymentStatusRecorder; private final JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> autoscaler; @@ -45,11 +43,9 @@ public class ReconcilerFactory { reconcilerMap; public ReconcilerFactory( - FlinkConfigManager configManager, EventRecorder eventRecorder, StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> deploymentStatusRecorder, JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> autoscaler) { - this.configManager = configManager; this.eventRecorder = eventRecorder; this.deploymentStatusRecorder = deploymentStatusRecorder; this.autoscaler = autoscaler; diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java index b174a29f..3d241b6c 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java @@ -92,7 +92,6 @@ public class TestingFlinkDeploymentController statusRecorder = new StatusRecorder<>(new MetricManager<>(), statusUpdateCounter); reconcilerFactory = new ReconcilerFactory( - configManager, eventRecorder, statusRecorder, AutoscalerFactory.create(