[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #672: [FLINK-33081] Apply parallelism overrides in scale

2023-09-13 Thread via GitHub


gyfora commented on code in PR #672:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/672#discussion_r1324638095


##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java:
##
@@ -74,6 +76,36 @@ public JobAutoScalerImpl(
 this.infoManager = new AutoscalerInfoManager();
 }
 
+@Override
+public void scale(FlinkResourceContext ctx) {
+var conf = ctx.getObserveConfig();
+var resource = ctx.getResource();
+var resourceId = ResourceID.fromResource(resource);
+var autoscalerMetrics = getOrInitAutoscalerFlinkMetrics(ctx, 
resourceId);
+
+try {
+if (resource.getSpec().getJob() == null || 
!conf.getBoolean(AUTOSCALER_ENABLED)) {
+LOG.debug("Autoscaler is disabled");
+return;
+}
+
+// Initialize metrics only if autoscaler is enabled
+var status = resource.getStatus();
+if (status.getLifecycleState() != ResourceLifecycleState.STABLE
+|| 
!status.getJobStatus().getState().equals(JobStatus.RUNNING.name())) {
+LOG.info("Autoscaler is waiting for RUNNING job state");
+lastEvaluatedMetrics.remove(resourceId);
+return;
+}
+
+updateParallelismOverrides(ctx, conf, resource, resourceId, 
autoscalerMetrics);
+} catch (Throwable e) {
+onError(ctx, resource, autoscalerMetrics, e);
+} finally {
+applyParallelismOverrides(ctx);

Review Comment:
I will move the disabled check in front of the try block which will clear 
the overrides and simply return. That way we won't actually call this method if 
it's disabled.



##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java:
##
@@ -74,6 +76,36 @@ public JobAutoScalerImpl(
 this.infoManager = new AutoscalerInfoManager();
 }
 
+@Override
+public void scale(FlinkResourceContext ctx) {
+var conf = ctx.getObserveConfig();
+var resource = ctx.getResource();
+var resourceId = ResourceID.fromResource(resource);
+var autoscalerMetrics = getOrInitAutoscalerFlinkMetrics(ctx, 
resourceId);
+
+try {
+if (resource.getSpec().getJob() == null || 
!conf.getBoolean(AUTOSCALER_ENABLED)) {
+LOG.debug("Autoscaler is disabled");
+return;
+}
+
+// Initialize metrics only if autoscaler is enabled
+var status = resource.getStatus();
+if (status.getLifecycleState() != ResourceLifecycleState.STABLE
+|| 
!status.getJobStatus().getState().equals(JobStatus.RUNNING.name())) {
+LOG.info("Autoscaler is waiting for RUNNING job state");
+lastEvaluatedMetrics.remove(resourceId);
+return;
+}
+
+updateParallelismOverrides(ctx, conf, resource, resourceId, 
autoscalerMetrics);
+} catch (Throwable e) {
+onError(ctx, resource, autoscalerMetrics, e);
+} finally {
+applyParallelismOverrides(ctx);

Review Comment:
   good point



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #672: [FLINK-33081] Apply parallelism overrides in scale

2023-09-13 Thread via GitHub


gyfora commented on code in PR #672:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/672#discussion_r1324635598


##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java:
##
@@ -140,86 +171,73 @@ public void 
applyParallelismOverrides(FlinkResourceContext ctx) {
 ConfigurationUtils.convertValue(userOverrides, 
String.class));
 }
 
-@Override
-public boolean scale(FlinkResourceContext ctx) {
+private boolean updateParallelismOverrides(

Review Comment:
   good catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #672: [FLINK-33081] Apply parallelism overrides in scale

2023-09-13 Thread via GitHub


gyfora commented on code in PR #672:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/672#discussion_r1324633942


##
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java:
##
@@ -74,6 +76,36 @@ public JobAutoScalerImpl(
 this.infoManager = new AutoscalerInfoManager();
 }
 
+@Override
+public void scale(FlinkResourceContext ctx) {
+var conf = ctx.getObserveConfig();
+var resource = ctx.getResource();
+var resourceId = ResourceID.fromResource(resource);
+var autoscalerMetrics = getOrInitAutoscalerFlinkMetrics(ctx, 
resourceId);
+

Review Comment:
   I think the `finally` block is fairly clean and makes it obvious that 
overrides are applied always, exactly once at the very end 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org