gyfora commented on code in PR #672:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/672#discussion_r1324638095
##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java:
##
@@ -74,6 +76,36 @@ public JobAutoScalerImpl(
this.infoManager = new AutoscalerInfoManager();
}
+@Override
+public void scale(FlinkResourceContext ctx) {
+var conf = ctx.getObserveConfig();
+var resource = ctx.getResource();
+var resourceId = ResourceID.fromResource(resource);
+var autoscalerMetrics = getOrInitAutoscalerFlinkMetrics(ctx,
resourceId);
+
+try {
+if (resource.getSpec().getJob() == null ||
!conf.getBoolean(AUTOSCALER_ENABLED)) {
+LOG.debug("Autoscaler is disabled");
+return;
+}
+
+// Initialize metrics only if autoscaler is enabled
+var status = resource.getStatus();
+if (status.getLifecycleState() != ResourceLifecycleState.STABLE
+||
!status.getJobStatus().getState().equals(JobStatus.RUNNING.name())) {
+LOG.info("Autoscaler is waiting for RUNNING job state");
+lastEvaluatedMetrics.remove(resourceId);
+return;
+}
+
+updateParallelismOverrides(ctx, conf, resource, resourceId,
autoscalerMetrics);
+} catch (Throwable e) {
+onError(ctx, resource, autoscalerMetrics, e);
+} finally {
+applyParallelismOverrides(ctx);
Review Comment:
I will move the disabled check in front of the try block which will clear
the overrides and simply return. That way we won't actually call this method if
it's disabled.
##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java:
##
@@ -74,6 +76,36 @@ public JobAutoScalerImpl(
this.infoManager = new AutoscalerInfoManager();
}
+@Override
+public void scale(FlinkResourceContext ctx) {
+var conf = ctx.getObserveConfig();
+var resource = ctx.getResource();
+var resourceId = ResourceID.fromResource(resource);
+var autoscalerMetrics = getOrInitAutoscalerFlinkMetrics(ctx,
resourceId);
+
+try {
+if (resource.getSpec().getJob() == null ||
!conf.getBoolean(AUTOSCALER_ENABLED)) {
+LOG.debug("Autoscaler is disabled");
+return;
+}
+
+// Initialize metrics only if autoscaler is enabled
+var status = resource.getStatus();
+if (status.getLifecycleState() != ResourceLifecycleState.STABLE
+||
!status.getJobStatus().getState().equals(JobStatus.RUNNING.name())) {
+LOG.info("Autoscaler is waiting for RUNNING job state");
+lastEvaluatedMetrics.remove(resourceId);
+return;
+}
+
+updateParallelismOverrides(ctx, conf, resource, resourceId,
autoscalerMetrics);
+} catch (Throwable e) {
+onError(ctx, resource, autoscalerMetrics, e);
+} finally {
+applyParallelismOverrides(ctx);
Review Comment:
good point
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org