This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new cbcc6b67 [FLINK-33526] Autoscaler config improvement + cleanup
cbcc6b67 is described below

commit cbcc6b67c98ddfad8bd6141edfd1a6e8c2ff00f5
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Sun Nov 12 22:12:53 2023 +0100

    [FLINK-33526] Autoscaler config improvement + cleanup
---
 .../generated/auto_scaler_configuration.html       |  8 +++----
 .../org/apache/flink/autoscaler/JobAutoScaler.java | 18 ++++++++++++----
 .../apache/flink/autoscaler/JobAutoScalerImpl.java |  1 +
 .../flink/autoscaler/config/AutoScalerOptions.java | 10 ++++-----
 .../MetricsCollectionAndEvaluationTest.java        |  2 +-
 .../flink/kubernetes/operator/FlinkOperator.java   |  3 +--
 .../AbstractFlinkResourceReconciler.java           | 25 +++++++---------------
 .../reconciler/deployment/ReconcilerFactory.java   |  4 ----
 .../TestingFlinkDeploymentController.java          |  1 -
 9 files changed, 34 insertions(+), 38 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html 
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index 0b5f33a8..c3a1d798 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -16,7 +16,7 @@
         </tr>
         <tr>
             <td><h5>job.autoscaler.catch-up.duration</h5></td>
-            <td style="word-wrap: break-word;">15 min</td>
+            <td style="word-wrap: break-word;">30 min</td>
             <td>Duration</td>
             <td>The target duration for fully processing any backlog after a 
scaling operation. Set to 0 to disable backlog based scaling.</td>
         </tr>
@@ -52,7 +52,7 @@
         </tr>
         <tr>
             <td><h5>job.autoscaler.metrics.window</h5></td>
-            <td style="word-wrap: break-word;">10 min</td>
+            <td style="word-wrap: break-word;">15 min</td>
             <td>Duration</td>
             <td>Scaling metrics aggregation window size.</td>
         </tr>
@@ -76,7 +76,7 @@
         </tr>
         <tr>
             <td><h5>job.autoscaler.restart.time</h5></td>
-            <td style="word-wrap: break-word;">3 min</td>
+            <td style="word-wrap: break-word;">5 min</td>
             <td>Duration</td>
             <td>Expected restart time to be used until the operator can 
determine it reliably from history.</td>
         </tr>
@@ -136,7 +136,7 @@
         </tr>
         <tr>
             <td><h5>job.autoscaler.target.utilization.boundary</h5></td>
-            <td style="word-wrap: break-word;">0.4</td>
+            <td style="word-wrap: break-word;">0.3</td>
             <td>Double</td>
             <td>Target vertex utilization boundary. Scaling won't be performed 
if the current processing rate is within [target_rate / (target_utilization - 
boundary), (target_rate / (target_utilization + boundary)]</td>
         </tr>
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java
index ff2b7331..f31fac10 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java
@@ -20,16 +20,26 @@ package org.apache.flink.autoscaler;
 import org.apache.flink.annotation.Internal;
 
 /**
- * The generic Autoscaler.
+ * Flink Job AutoScaler.
  *
  * @param <KEY> The job key.
+ * @param <Context> Instance of {@link JobAutoScalerContext}.
  */
 @Internal
 public interface JobAutoScaler<KEY, Context extends JobAutoScalerContext<KEY>> 
{
 
-    /** Called as part of the reconciliation loop. */
+    /**
+     * Compute and apply new parallelism overrides for the provided job 
context.
+     *
+     * @param context Job context.
+     * @throws Exception
+     */
     void scale(Context context) throws Exception;
 
-    /** Called when the job is deleted. */
-    void cleanup(KEY key);
+    /**
+     * Called when the job is deleted.
+     *
+     * @param jobKey Job key.
+     */
+    void cleanup(KEY jobKey);
 }
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
index ca640371..69c646f5 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
@@ -91,6 +91,7 @@ public class JobAutoScalerImpl<KEY, Context extends 
JobAutoScalerContext<KEY>>
             }
 
             if (ctx.getJobStatus() != JobStatus.RUNNING) {
+                LOG.debug("Autoscaler is waiting for stable, running state");
                 lastEvaluatedMetrics.remove(ctx.getJobKey());
                 return;
             }
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index e8872a35..4ac06fb1 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -60,7 +60,7 @@ public class AutoScalerOptions {
     public static final ConfigOption<Duration> METRICS_WINDOW =
             autoScalerConfig("metrics.window")
                     .durationType()
-                    .defaultValue(Duration.ofMinutes(10))
+                    .defaultValue(Duration.ofMinutes(15))
                     
.withDeprecatedKeys(deprecatedOperatorConfigKey("metrics.window"))
                     .withDescription("Scaling metrics aggregation window 
size.");
 
@@ -82,7 +82,7 @@ public class AutoScalerOptions {
     public static final ConfigOption<Double> TARGET_UTILIZATION_BOUNDARY =
             autoScalerConfig("target.utilization.boundary")
                     .doubleType()
-                    .defaultValue(0.4)
+                    .defaultValue(0.3)
                     
.withDeprecatedKeys(deprecatedOperatorConfigKey("target.utilization.boundary"))
                     .withDescription(
                             "Target vertex utilization boundary. Scaling won't 
be performed if the current processing rate is within [target_rate / 
(target_utilization - boundary), (target_rate / (target_utilization + 
boundary)]");
@@ -129,7 +129,7 @@ public class AutoScalerOptions {
     public static final ConfigOption<Duration> CATCH_UP_DURATION =
             autoScalerConfig("catch-up.duration")
                     .durationType()
-                    .defaultValue(Duration.ofMinutes(15))
+                    .defaultValue(Duration.ofMinutes(30))
                     
.withDeprecatedKeys(deprecatedOperatorConfigKey("catch-up.duration"))
                     .withDescription(
                             "The target duration for fully processing any 
backlog after a scaling operation. Set to 0 to disable backlog based scaling.");
@@ -137,7 +137,7 @@ public class AutoScalerOptions {
     public static final ConfigOption<Duration> RESTART_TIME =
             autoScalerConfig("restart.time")
                     .durationType()
-                    .defaultValue(Duration.ofMinutes(3))
+                    .defaultValue(Duration.ofMinutes(5))
                     
.withDeprecatedKeys(deprecatedOperatorConfigKey("restart.time"))
                     .withDescription(
                             "Expected restart time to be used until the 
operator can determine it reliably from history.");
@@ -234,7 +234,7 @@ public class AutoScalerOptions {
     public static final ConfigOption<Duration> SCALING_EVENT_INTERVAL =
             autoScalerConfig("scaling.event.interval")
                     .durationType()
-                    .defaultValue(Duration.ofSeconds(1800))
+                    .defaultValue(Duration.ofMinutes(30))
                     
.withDeprecatedKeys(deprecatedOperatorConfigKey("scaling.event.interval"))
                     .withDescription("Time interval to resend the identical 
event");
 
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
index ef84c52b..d216ddfa 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -376,7 +376,7 @@ public class MetricsCollectionAndEvaluationTest {
                 5000.,
                 
evaluation.get(source1).get(ScalingMetric.TRUE_PROCESSING_RATE).getCurrent());
         assertEquals(
-                1667.,
+                1250.,
                 
evaluation.get(source1).get(ScalingMetric.SCALE_DOWN_RATE_THRESHOLD).getCurrent());
         assertEquals(
                 500.,
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 3dbf4fee..0735bf33 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -166,8 +166,7 @@ public class FlinkOperator {
                 MetricManager.createFlinkDeploymentMetricManager(baseConfig, 
metricGroup);
         var statusRecorder = StatusRecorder.create(client, metricManager, 
listeners);
         var autoscaler = AutoscalerFactory.create(client, eventRecorder);
-        var reconcilerFactory =
-                new ReconcilerFactory(configManager, eventRecorder, 
statusRecorder, autoscaler);
+        var reconcilerFactory = new ReconcilerFactory(eventRecorder, 
statusRecorder, autoscaler);
         var observerFactory = new 
FlinkDeploymentObserverFactory(eventRecorder);
         var canaryResourceManager = new 
CanaryResourceManager<FlinkDeployment>(configManager);
         
HealthProbe.INSTANCE.registerCanaryResourceManager(canaryResourceManager);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index dfe98a89..5bef0ba7 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -18,7 +18,6 @@
 package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.autoscaler.JobAutoScaler;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
@@ -138,7 +137,7 @@ public abstract class AbstractFlinkResourceReconciler<
                 
cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
         SPEC currentDeploySpec = cr.getSpec();
 
-        scaling(ctx);
+        applyAutoscaler(ctx);
 
         var reconciliationState = reconciliationStatus.getState();
         var specDiff =
@@ -181,21 +180,13 @@ public abstract class AbstractFlinkResourceReconciler<
         }
     }
 
-    private void scaling(FlinkResourceContext<CR> ctx) throws Exception {
-        KubernetesJobAutoScalerContext autoScalerContext = 
ctx.getJobAutoScalerContext();
-
-        if (autoscalerDisabled(ctx)) {
-            autoScalerContext.getConfiguration().set(AUTOSCALER_ENABLED, 
false);
-        } else if (autoScalerContext.getJobStatus() != JobStatus.RUNNING) {
-            LOG.info("Autoscaler is waiting for stable, running state");
-        }
-
-        autoscaler.scale(autoScalerContext);
-    }
-
-    private boolean autoscalerDisabled(FlinkResourceContext<CR> ctx) {
-        return ctx.getResource().getSpec().getJob() == null
-                || !ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED);
+    private void applyAutoscaler(FlinkResourceContext<CR> ctx) throws 
Exception {
+        var autoScalerCtx = ctx.getJobAutoScalerContext();
+        boolean autoscalerEnabled =
+                ctx.getResource().getSpec().getJob() != null
+                        && 
ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED);
+        autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, 
autoscalerEnabled);
+        autoscaler.scale(autoScalerCtx);
     }
 
     private void triggerSpecChangeEvent(CR cr, DiffResult<SPEC> specDiff, 
KubernetesClient client) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
index beba19dc..429caaa6 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
@@ -23,7 +23,6 @@ import 
org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
 import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
-import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.config.Mode;
 import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -37,7 +36,6 @@ import java.util.concurrent.ConcurrentHashMap;
 /** The factory to create reconciler based on app mode. */
 public class ReconcilerFactory {
 
-    private final FlinkConfigManager configManager;
     private final EventRecorder eventRecorder;
     private final StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
deploymentStatusRecorder;
     private final JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> 
autoscaler;
@@ -45,11 +43,9 @@ public class ReconcilerFactory {
             reconcilerMap;
 
     public ReconcilerFactory(
-            FlinkConfigManager configManager,
             EventRecorder eventRecorder,
             StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> 
deploymentStatusRecorder,
             JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> 
autoscaler) {
-        this.configManager = configManager;
         this.eventRecorder = eventRecorder;
         this.deploymentStatusRecorder = deploymentStatusRecorder;
         this.autoscaler = autoscaler;
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index b174a29f..3d241b6c 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -92,7 +92,6 @@ public class TestingFlinkDeploymentController
         statusRecorder = new StatusRecorder<>(new MetricManager<>(), 
statusUpdateCounter);
         reconcilerFactory =
                 new ReconcilerFactory(
-                        configManager,
                         eventRecorder,
                         statusRecorder,
                         AutoscalerFactory.create(

Reply via email to