Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-17 Thread via GitHub


gyfora merged PR #677:
URL: https://github.com/apache/flink-kubernetes-operator/pull/677


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-17 Thread via GitHub


1996fanrui commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1361898608


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##
@@ -47,6 +55,48 @@ public abstract class FlinkResourceContexthttps://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1350028811



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-17 Thread via GitHub


1996fanrui commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1361907227


##
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/EventCollector.java:
##
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import lombok.Getter;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The event handler for collecting events. */
+public class EventCollector>

Review Comment:
   Thanks for these suggestions, these 3 classes are renamed at the last commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-17 Thread via GitHub


gyfora commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1361902603


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##
@@ -47,6 +55,48 @@ public abstract class FlinkResourceContext

Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-17 Thread via GitHub


1996fanrui commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1361898608


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##
@@ -47,6 +55,48 @@ public abstract class FlinkResourceContexthttps://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1350028811



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-17 Thread via GitHub


gyfora commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1361881666


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##
@@ -47,6 +55,48 @@ public abstract class FlinkResourceContexthttp://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.realizer;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import java.util.LinkedList;
+import java.util.Map;
+
+/** The event handler for collecting scaling events. */
+public class ScalingRealizerCollector>

Review Comment:
   Could we call this `TestingScalingRealizer` that way the intention is clear.



##
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/FlushCountableAutoscalerStateStore.java:
##
@@ -15,19 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.autoscaler.state;
 
-import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
 
-import io.fabric8.kubernetes.client.KubernetesClient;
-import lombok.SneakyThrows;
+/** The state store counts the flush. */
+public class FlushCountableAutoscalerStateStore>

Review Comment:
   Could we call this `TestingAutoscalerStateStore` that way the intention is 
clear.



##
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/EventCollector.java:
##
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import lombok.Getter;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The event handler for collecting events. */
+public class EventCollector>

Review Comment:
   Could we call this `TestingEventCollector` that way the intention is clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-17 Thread via GitHub


1996fanrui commented on PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#issuecomment-1766110914

   > I've also manually tested this change on Flink 1.18 (both reactive and 
non-reactive), and worked flawlessly. Thank you for the effort you've put into 
this, I really like the direction the autoscaler is heading.
   
   Thanks @mateczagany  for the test!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-17 Thread via GitHub


mateczagany commented on PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#issuecomment-1766010488

   I've also manually tested this change on Flink 1.18 (both reactive and 
non-reactive), and worked flawlessly. 
   Thank you for the effort you've put into this, I really like the direction 
the autoscaler is heading.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-17 Thread via GitHub


gyfora commented on PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#issuecomment-1765929234

   > Hi @mxm @gyfora , thanks for your hard review!
   > 
   > I addressed all comments of this PR so now. Some interfaces, code design 
and code style are improved, and no big changes during reviewing, I have 
updated the latest interfaces to the FLIP doc.
   > 
   > About manual testing, I rebuilt the image to check whether it's fine just 
now, and run the `AutoscalingExample` with `autoscaling.yaml`. In general, it 
works well. There are some warn log due to these demo job still use the 
deprecated configuration key, and I have updated them in the last fix commit.
   > 
   > Please help review again in your free time, big thanks~ ❤️
   > 
   > I can squash the last commit after your check. Also, please let me know if 
you think more manual testing is needed, thanks.
   > 
   > ```
   > 2023-10-17 15:33:56,930 o.a.f.c.Configuration  [WARN ] 
[default.autoscaling-example] Config uses deprecated configuration key 
'kubernetes.operator.job.autoscaler.enabled' instead of proper key 
'job.autoscaler.enabled'
   > 2023-10-17 15:33:56,931 o.a.f.c.Configuration  [WARN ] 
[default.autoscaling-example] Config uses deprecated configuration key 
'kubernetes.operator.job.autoscaler.enabled' instead of proper key 
'job.autoscaler.enabled'
   > 2023-10-17 15:33:57,205 o.a.f.a.ScalingMetricCollector [INFO ] 
[default.autoscaling-example] Job updated at 2023-10-17T07:32:56.133Z. Clearing 
metrics.
   > 2023-10-17 15:33:57,331 o.a.f.c.Configuration  [WARN ] 
[default.autoscaling-example] Config uses deprecated configuration key 
'kubernetes.operator.job.autoscaler.metrics.window' instead of proper key 
'job.autoscaler.metrics.window'
   > 2023-10-17 15:33:57,332 o.a.f.c.Configuration  [WARN ] 
[default.autoscaling-example] Config uses deprecated configuration key 
'kubernetes.operator.job.autoscaler.stabilization.interval' instead of proper 
key 'job.autoscaler.stabilization.interval'
   > 2023-10-17 15:33:57,397 o.a.f.a.ScalingMetricCollector [WARN ] 
[default.autoscaling-example] pendingRecords metric for 
cbc357ccb763df2852fee8c4fc7d55f2 could not be found. Either a legacy source or 
an idle source. Assuming no pending records.
   > 2023-10-17 15:33:57,620 o.a.f.a.ScalingMetricCollector [INFO ] 
[default.autoscaling-example] Metric window not full until 
2023-10-17T07:36:56.931915Z
   > 2023-10-17 15:33:57,623 o.a.f.c.Configuration  [WARN ] 
[default.autoscaling-example] Config uses deprecated configuration key 
'kubernetes.operator.job.autoscaler.target.utilization.boundary' instead of 
proper key 'job.autoscaler.target.utilization.boundary'
   > 2023-10-17 15:33:57,624 o.a.f.c.Configuration  [WARN ] 
[default.autoscaling-example] Config uses deprecated configuration key 
'kubernetes.operator.job.autoscaler.target.utilization.boundary' instead of 
proper key 'job.autoscaler.target.utilization.boundary'
   > ```
   
   Thank you @1996fanrui for the persistence and all the nice work. I will do a 
final pass on this later today, let's try to merge this as soon as possible to 
unblock current work on the autoscaler :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-17 Thread via GitHub


1996fanrui commented on PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#issuecomment-1765865647

   Hi @mxm @gyfora , thanks for your hard review! 
   
   I addressed all comments of this PR so now. Some interfaces, code design and 
code style are improved, and no big changes during reviewing, I have updated 
the latest interfaces to the FLIP doc.
   
   About manual testing, I rebuilt the image to check whether it's fine just 
now, and run the `AutoscalingExample` with `autoscaling.yaml`. In general, it 
works well. There are some warn log due to these demo job still use the 
deprecated configuration key, and I have updated them in the last fix commit. 
   
   Please help review again in your free time, big thanks~ ❤️ 
   
   I can squash the last commit after your check. Also, please let me know if 
you think more manual testing is needed, thanks.
   
   ```
   2023-10-17 15:33:56,930 o.a.f.c.Configuration  [WARN ] 
[default.autoscaling-example] Config uses deprecated configuration key 
'kubernetes.operator.job.autoscaler.enabled' instead of proper key 
'job.autoscaler.enabled'
   2023-10-17 15:33:56,931 o.a.f.c.Configuration  [WARN ] 
[default.autoscaling-example] Config uses deprecated configuration key 
'kubernetes.operator.job.autoscaler.enabled' instead of proper key 
'job.autoscaler.enabled'
   2023-10-17 15:33:57,205 o.a.f.a.ScalingMetricCollector [INFO ] 
[default.autoscaling-example] Job updated at 2023-10-17T07:32:56.133Z. Clearing 
metrics.
   2023-10-17 15:33:57,331 o.a.f.c.Configuration  [WARN ] 
[default.autoscaling-example] Config uses deprecated configuration key 
'kubernetes.operator.job.autoscaler.metrics.window' instead of proper key 
'job.autoscaler.metrics.window'
   2023-10-17 15:33:57,332 o.a.f.c.Configuration  [WARN ] 
[default.autoscaling-example] Config uses deprecated configuration key 
'kubernetes.operator.job.autoscaler.stabilization.interval' instead of proper 
key 'job.autoscaler.stabilization.interval'
   2023-10-17 15:33:57,397 o.a.f.a.ScalingMetricCollector [WARN ] 
[default.autoscaling-example] pendingRecords metric for 
cbc357ccb763df2852fee8c4fc7d55f2 could not be found. Either a legacy source or 
an idle source. Assuming no pending records.
   2023-10-17 15:33:57,620 o.a.f.a.ScalingMetricCollector [INFO ] 
[default.autoscaling-example] Metric window not full until 
2023-10-17T07:36:56.931915Z
   2023-10-17 15:33:57,623 o.a.f.c.Configuration  [WARN ] 
[default.autoscaling-example] Config uses deprecated configuration key 
'kubernetes.operator.job.autoscaler.target.utilization.boundary' instead of 
proper key 'job.autoscaler.target.utilization.boundary'
   2023-10-17 15:33:57,624 o.a.f.c.Configuration  [WARN ] 
[default.autoscaling-example] Config uses deprecated configuration key 
'kubernetes.operator.job.autoscaler.target.utilization.boundary' instead of 
proper key 'job.autoscaler.target.utilization.boundary'
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-16 Thread via GitHub


mateczagany commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1360856610


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##
@@ -441,15 +442,21 @@ protected Collection 
queryAggregatedMetricNames(
 
 protected abstract Map>
 queryAllAggregatedMetrics(
-AbstractFlinkResource cr,
-FlinkService flinkService,
-Configuration conf,
+Context ctx,
 Map> 
filteredVertexMetricNames);
 
-public void cleanup(AbstractFlinkResource cr) {
-var resourceId = ResourceID.fromResource(cr);
-histories.remove(resourceId);
-availableVertexMetricNames.remove(resourceId);
+public JobDetailsInfo getJobDetailsInfo(
+JobAutoScalerContext context, Duration clientTimeout) throws 
Exception {

Review Comment:
   That makes sense, I think you've explained it very well, thank you! 
   
   I still think that the naming and the abstraction can be a bit confusing for 
a fresh pair of eyes, but as you've said and as I've said in my original 
comment, this is not related to your PR, so I think it's perfectly fine to 
leave it as is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-15 Thread via GitHub


1996fanrui commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1360079090


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##
@@ -441,15 +442,21 @@ protected Collection 
queryAggregatedMetricNames(
 
 protected abstract Map>
 queryAllAggregatedMetrics(
-AbstractFlinkResource cr,
-FlinkService flinkService,
-Configuration conf,
+Context ctx,
 Map> 
filteredVertexMetricNames);
 
-public void cleanup(AbstractFlinkResource cr) {
-var resourceId = ResourceID.fromResource(cr);
-histories.remove(resourceId);
-availableVertexMetricNames.remove(resourceId);
+public JobDetailsInfo getJobDetailsInfo(
+JobAutoScalerContext context, Duration clientTimeout) throws 
Exception {

Review Comment:
   Thanks @mateczagany for this comment.
   
   IIUC, you mean `ScalingMetricCollector` is using the `RestClusterClient`, 
and `RestApiMetricsCollector` is totally based on `RestClusterClient`, so these 
2 classes can be merged into one classes, right?
   
   If so, I try to explain the difference between : `RestApiMetricsCollector` 
and `ScalingMetricCollector`.
   
   - `RestApiMetricsCollector` calls `RestClusterClient`, and it's used to 
fetch specific metrics.
   - `ScalingMetricCollector` calls `RestClusterClient` and is not used to 
fetch specific metrics.
   - `RestClusterClient` is used in `ScalingMetricCollector` to get some job 
metadata, such as: `getJobDetailsInfo` to generate the `JobTopology`, 
`queryFilteredMetricNames`, `updateKafkaSourceMaxParallelisms`.
   - The JobTopology is the metadata of Job, and it cannot be fetched from 
metrics. That means the `RestClusterClient` is needed even if we query specific 
metrics from other system.
   
   Based on them, it may be better to keep `ScalingMetricCollector` as abstract 
class and not remove `RestApiMetricsCollector`. It's easy to fetch specific 
metrics from other system in the future.
   
   Also, we can see the `ScalingMetricCollector` also used the 
`RestClusterClient` on the current master branch.
   
   WDYT? And please correct me if my understanding is wrong, thanks~
   
   [1] 
https://github.com/apache/flink-kubernetes-operator/blob/305498a9ab2e04ab71a4c2d87f2edb746373df1a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java#L368C45-L368C45



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-15 Thread via GitHub


1996fanrui commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1360032639


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.function.SupplierWithException;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+
+import javax.annotation.Nullable;
+
+/** An implementation of JobAutoscalerContext for Kubernetes. */
+public class KubernetesJobAutoScalerContext extends 
JobAutoScalerContext {
+
+private final AbstractFlinkResource resource;
+
+private final KubernetesClient kubernetesClient;
+
+public KubernetesJobAutoScalerContext(
+JobID jobID,

Review Comment:
   Thanks for pointing it out, updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-14 Thread via GitHub


mateczagany commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1359305741


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.function.SupplierWithException;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+
+import javax.annotation.Nullable;
+
+/** An implementation of JobAutoscalerContext for Kubernetes. */
+public class KubernetesJobAutoScalerContext extends 
JobAutoScalerContext {
+
+private final AbstractFlinkResource resource;
+
+private final KubernetesClient kubernetesClient;
+
+public KubernetesJobAutoScalerContext(
+JobID jobID,

Review Comment:
   jobID should also be `@Nullable` 



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##
@@ -441,15 +442,21 @@ protected Collection 
queryAggregatedMetricNames(
 
 protected abstract Map>
 queryAllAggregatedMetrics(
-AbstractFlinkResource cr,
-FlinkService flinkService,
-Configuration conf,
+Context ctx,
 Map> 
filteredVertexMetricNames);
 
-public void cleanup(AbstractFlinkResource cr) {
-var resourceId = ResourceID.fromResource(cr);
-histories.remove(resourceId);
-availableVertexMetricNames.remove(resourceId);
+public JobDetailsInfo getJobDetailsInfo(
+JobAutoScalerContext context, Duration clientTimeout) throws 
Exception {

Review Comment:
   I don't expect it to be fixed in this PR, just want to note that this is an 
abstract class with `RestApiMetricsCollector` extending it. This makes it seem 
like it would be possible to extend this class to be able to collect metrics 
via methods other than REST API.
   
   This would already be a challenge as there are several REST API calls 
already in this class, but with this PR, we will also tie this class to REST 
API with `JobAutoScalerContext`.
   
   Perhaps it makes sense to remove `RestApiMetricsCollector` at this point?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-12 Thread via GitHub


1996fanrui commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1355991932


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -166,6 +197,13 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .stringType()
 .asList()
 .defaultValues()
+
.withDeprecatedKeys(deprecatedOperatorConfigKey("vertex.exclude.ids"))
 .withDescription(
 "A (semicolon-separated) list of vertex ids in 
hexstring for which to disable scaling. Caution: For non-sink vertices this 
will still scale their downstream operators until 
https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
+
+public static final ConfigOption FLINK_CLIENT_TIMEOUT =
+autoScalerConfig("flink.client.timeout")
+.durationType()
+.defaultValue(Duration.ofSeconds(10))
+.withDescription("The timeout for waiting the flink rest 
client to return.");

Review Comment:
   Do you mean moving the 
`KubernetesOperatorConfigOptions#OPERATOR_FLINK_CLIENT_TIMEOUT` to here?
   
   It's a generic option for kubernetes operator, it's not only used for 
autoscaler, but also used for a lot of `AbstractFlinkService` methods. That's 
why I didn't move it here.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-12 Thread via GitHub


1996fanrui commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1356620090


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java:
##
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+
+/**
+ * Handler all loggable events during scaling.
+ *
+ * @param  The job key.
+ * @param  Instance of JobAutoScalerContext.
+ */
+@Experimental
+public interface AutoScalerEventHandler> {
+
+/**
+ * Handle the event.
+ *
+ * @param interval When interval is great than 0, events that repeat 
within the interval will be
+ * ignored.
+ */
+void handleEvent(
+Context context,
+Type type,
+String reason,
+String message,
+@Nullable String messageKey,
+@Nullable Duration interval);
+
+/** The type of the events. */
+enum Type {
+Normal,
+Error

Review Comment:
   Thanks for the feedback, I have reverted the `Error` to `Warning` by a force 
push.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-12 Thread via GitHub


mxm commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1356604652


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+import static 
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism;
+import static 
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism;
+
+/** The default implementation of {@link JobAutoScaler}. */
+public class JobAutoScalerImpl>
+implements JobAutoScaler {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(JobAutoScalerImpl.class);
+
+@VisibleForTesting protected static final String AUTOSCALER_ERROR = 
"AutoscalerError";
+
+private final ScalingMetricCollector metricsCollector;
+private final ScalingMetricEvaluator evaluator;
+private final ScalingExecutor scalingExecutor;
+private final AutoScalerEventHandler eventHandler;
+private final ScalingRealizer scalingRealizer;
+private final AutoScalerStateStore stateStore;
+
+@VisibleForTesting
+final Map>>
+lastEvaluatedMetrics = new ConcurrentHashMap<>();
+
+@VisibleForTesting
+final Map flinkMetrics = new 
ConcurrentHashMap<>();
+
+public JobAutoScalerImpl(
+ScalingMetricCollector metricsCollector,
+ScalingMetricEvaluator evaluator,
+ScalingExecutor scalingExecutor,
+AutoScalerEventHandler eventHandler,
+ScalingRealizer scalingRealizer,
+AutoScalerStateStore stateStore) {
+this.metricsCollector = metricsCollector;
+this.evaluator = evaluator;
+this.scalingExecutor = scalingExecutor;
+this.eventHandler = eventHandler;
+this.scalingRealizer = scalingRealizer;
+this.stateStore = stateStore;
+}
+
+@Override
+public void scale(Context ctx) throws Exception {
+var autoscalerMetrics = getOrInitAutoscalerFlinkMetrics(ctx);
+
+try {
+if (!ctx.getConfiguration().getBoolean(AUTOSCALER_ENABLED)) {
+LOG.debug("Autoscaler is disabled");
+clearParallelismOverrides(ctx);
+return;
+}
+
+if (ctx.getJobStatus() != JobStatus.RUNNING) {
+lastEvaluatedMetrics.remove(ctx.getJobKey());
+return;
+}
+
+runScalingLogic(ctx, autoscalerMetrics);
+stateStore.flush(ctx);

Review Comment:
   Looks good to me. Just wanted to double-check! Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-12 Thread via GitHub


mxm commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1356605280


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java:
##
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+
+/**
+ * Handler all loggable events during scaling.
+ *
+ * @param  The job key.
+ * @param  Instance of JobAutoScalerContext.
+ */
+@Experimental
+public interface AutoScalerEventHandler> {
+
+/**
+ * Handle the event.
+ *
+ * @param interval When interval is great than 0, events that repeat 
within the interval will be
+ * ignored.
+ */
+void handleEvent(
+Context context,
+Type type,
+String reason,
+String message,
+@Nullable String messageKey,
+@Nullable Duration interval);
+
+/** The type of the events. */
+enum Type {
+Normal,
+Error

Review Comment:
   Fine with me. Thanks for the explanation!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-12 Thread via GitHub


XComp commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1356512684


##
flink-autoscaler/pom.xml:
##
@@ -45,6 +45,32 @@ under the License.
 provided
 
 
+
+org.projectlombok
+lombok
+${lombok.version}
+provided
+
+
+
+org.junit.jupiter
+junit-jupiter-params
+test
+
+
+

Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-11 Thread via GitHub


1996fanrui commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1356017440


##
flink-autoscaler/pom.xml:
##
@@ -45,6 +45,32 @@ under the License.
 provided
 
 
+
+org.projectlombok
+lombok
+${lombok.version}
+provided
+
+
+
+org.junit.jupiter
+junit-jupiter-params
+test
+
+
+

Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-11 Thread via GitHub


1996fanrui commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1355987715


##
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java:
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Tests for {@link RestApiMetricsCollector}. */
+public class RestApiMetricsCollectorTest {
+
+@Test
+public void testAggregateMultiplePendingRecordsMetricsPerSource() throws 
Exception {
+RestApiMetricsCollector> collector =
+new RestApiMetricsCollector<>();
+
+JobVertexID jobVertexID = new JobVertexID();
+Map flinkMetrics =
+Map.of(
+"a.pendingRecords", FlinkMetric.PENDING_RECORDS,
+"b.pendingRecords", FlinkMetric.PENDING_RECORDS);
+Map> metrics = 
Map.of(jobVertexID, flinkMetrics);
+
+List aggregatedMetricsResponse =
+List.of(
+new AggregatedMetric(
+"a.pendingRecords", Double.NaN, Double.NaN, 
Double.NaN, 100.),
+new AggregatedMetric(
+"b.pendingRecords", Double.NaN, Double.NaN, 
Double.NaN, 100.),
+new AggregatedMetric(
+"c.unrelated", Double.NaN, Double.NaN, 
Double.NaN, 100.));
+
+Configuration conf = new Configuration();
+RestClusterClient restClusterClient =
+new RestClusterClient<>(
+conf,
+"test-cluster",
+(c, e) -> new StandaloneClientHAServices("localhost")) 
{
+@Override
+public <
+M extends MessageHeaders,
+U extends MessageParameters,
+R extends RequestBody,
+P extends ResponseBody>
+CompletableFuture sendRequest(
+M messageHeaders, U messageParameters, R 
request) {
+if (messageHeaders instanceof 
AggregatedSubtaskMetricsHeaders) {
+return (CompletableFuture)
+CompletableFuture.completedFuture(
+new AggregatedMetricsResponseBody(
+
aggregatedMetricsResponse));
+}
+return (CompletableFuture)
+
CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+}
+};
+
+JobID jobID = new JobID();
+JobAutoScalerContext context =
+new JobAutoScalerContext<>(
+jobID,
+jobID,
+ 

Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-11 Thread via GitHub


mxm commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1355181704


##
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java:
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Tests for {@link RestApiMetricsCollector}. */
+public class RestApiMetricsCollectorTest {
+
+@Test
+public void testAggregateMultiplePendingRecordsMetricsPerSource() throws 
Exception {
+RestApiMetricsCollector> collector =
+new RestApiMetricsCollector<>();
+
+JobVertexID jobVertexID = new JobVertexID();
+Map flinkMetrics =
+Map.of(
+"a.pendingRecords", FlinkMetric.PENDING_RECORDS,
+"b.pendingRecords", FlinkMetric.PENDING_RECORDS);
+Map> metrics = 
Map.of(jobVertexID, flinkMetrics);
+
+List aggregatedMetricsResponse =
+List.of(
+new AggregatedMetric(
+"a.pendingRecords", Double.NaN, Double.NaN, 
Double.NaN, 100.),
+new AggregatedMetric(
+"b.pendingRecords", Double.NaN, Double.NaN, 
Double.NaN, 100.),
+new AggregatedMetric(
+"c.unrelated", Double.NaN, Double.NaN, 
Double.NaN, 100.));
+
+Configuration conf = new Configuration();
+RestClusterClient restClusterClient =
+new RestClusterClient<>(
+conf,
+"test-cluster",
+(c, e) -> new StandaloneClientHAServices("localhost")) 
{
+@Override
+public <
+M extends MessageHeaders,
+U extends MessageParameters,
+R extends RequestBody,
+P extends ResponseBody>
+CompletableFuture sendRequest(
+M messageHeaders, U messageParameters, R 
request) {
+if (messageHeaders instanceof 
AggregatedSubtaskMetricsHeaders) {
+return (CompletableFuture)
+CompletableFuture.completedFuture(
+new AggregatedMetricsResponseBody(
+
aggregatedMetricsResponse));
+}
+return (CompletableFuture)
+
CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+}
+};
+
+JobID jobID = new JobID();
+JobAutoScalerContext context =
+new JobAutoScalerContext<>(
+jobID,
+jobID,
+

Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-11 Thread via GitHub


mxm commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1355175979


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java:
##
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+
+/**
+ * Handler all loggable events during scaling.
+ *
+ * @param  The job key.
+ * @param  Instance of JobAutoScalerContext.
+ */
+@Experimental
+public interface AutoScalerEventHandler> {
+
+/**
+ * Handle the event.
+ *
+ * @param interval When interval is great than 0, events that repeat 
within the interval will be
+ * ignored.
+ */
+void handleEvent(
+Context context,
+Type type,
+String reason,
+String message,
+@Nullable String messageKey,
+@Nullable Duration interval);
+
+/** The type of the events. */
+enum Type {
+Normal,
+Error

Review Comment:
   Why not Normal, _Warning_, Error?



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+import static 
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism;
+import static 
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism;
+
+/** The default implementation of {@link JobAutoScaler}. */
+public class JobAutoScalerImpl>
+implements JobAutoScaler {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(JobAutoScalerImpl.class);
+
+@VisibleForTesting protected static final String AUTOSCALER_ERROR = 
"AutoscalerError";
+
+private final ScalingMetricCollector metricsCollector;
+private final ScalingMetricEvaluator evaluator;
+private final ScalingExecutor scalingExecutor;
+private final AutoScalerEventHandler eventHandler;
+private final ScalingRealizer scalingRealizer;
+private final AutoScalerStateStore stateStore;
+
+@VisibleForTesting
+final Map>>
+lastEvaluatedMetrics = new ConcurrentHashMap<>();
+
+@VisibleForTesting
+final Map flinkMetrics = new 

Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-11 Thread via GitHub


mxm commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1355174254


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -166,6 +197,13 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .stringType()
 .asList()
 .defaultValues()
+
.withDeprecatedKeys(deprecatedOperatorConfigKey("vertex.exclude.ids"))
 .withDescription(
 "A (semicolon-separated) list of vertex ids in 
hexstring for which to disable scaling. Caution: For non-sink vertices this 
will still scale their downstream operators until 
https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
+
+public static final ConfigOption FLINK_CLIENT_TIMEOUT =
+autoScalerConfig("flink.client.timeout")
+.durationType()
+.defaultValue(Duration.ofSeconds(10))
+.withDescription("The timeout for waiting the flink rest 
client to return.");

Review Comment:
   Let's keep the existing option. We can move it here like you did but it 
should continue to support `flink.client.timeout`. We can add it as a 
deprecated key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-11 Thread via GitHub


mxm commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1355160155


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##
@@ -47,6 +55,48 @@ public abstract class FlinkResourceContext status = getResource().getStatus();
+String jobId = status.getJobStatus().getJobId();
+
+JobStatus jobStatus = generateJobStatusEnum(status);
+
+return new KubernetesJobAutoScalerContext(
+jobId == null ? null : JobID.fromHexString(jobId),
+jobStatus,
+conf,
+getResourceMetricGroup(),
+() -> getFlinkService().getClusterClient(conf),
+resource,
+getKubernetesClient());
+}
+
+@Nullable
+private JobStatus generateJobStatusEnum(CommonStatus status) {
+if (status.getLifecycleState() != ResourceLifecycleState.STABLE) {
+return null;
+}
+
+String state = status.getJobStatus().getState();
+if (state == null) {
+return null;
+}
+return JobStatus.valueOf(state);

Review Comment:
   (Didn't see your comments until I hit refresh). Fine to defer this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-11 Thread via GitHub


mxm commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354624451


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##
@@ -47,6 +55,48 @@ public abstract class FlinkResourceContext status = getResource().getStatus();
+String jobId = status.getJobStatus().getJobId();
+
+JobStatus jobStatus = generateJobStatusEnum(status);
+
+return new KubernetesJobAutoScalerContext(
+jobId == null ? null : JobID.fromHexString(jobId),
+jobStatus,
+conf,
+getResourceMetricGroup(),
+() -> getFlinkService().getClusterClient(conf),
+resource,
+getKubernetesClient());
+}
+
+@Nullable
+private JobStatus generateJobStatusEnum(CommonStatus status) {
+if (status.getLifecycleState() != ResourceLifecycleState.STABLE) {
+return null;
+}
+
+String state = status.getJobStatus().getState();
+if (state == null) {
+return null;
+}
+return JobStatus.valueOf(state);

Review Comment:
   Sure, if that does not cause any problems with the CRD state, that is fine 
with me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-11 Thread via GitHub


XComp commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354589163


##
flink-autoscaler/pom.xml:
##
@@ -45,6 +45,32 @@ under the License.
 provided
 
 
+
+org.projectlombok
+lombok
+${lombok.version}
+provided
+
+
+
+org.junit.jupiter
+junit-jupiter-params
+test
+
+
+

Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-11 Thread via GitHub


gyfora commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354562864


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * The job autoscaler context, it includes all details related to the current 
job.
+ *
+ * @param  The job key.
+ */
+@Experimental
+public interface JobAutoScalerContext {
+
+/** The identifier of each flink job. */
+KEY getJobKey();
+
+JobID getJobID();
+
+Configuration getConfiguration();
+
+MetricGroup getMetricGroup();
+
+RestClusterClient getRestClusterClient() throws Exception;

Review Comment:
   We intentionally avoided too many wrappers, generated clients etc whenever 
possible. Testing is not too bad and we would like to avoid the 
interface/factory hell that is currently present in core Flink :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-11 Thread via GitHub


XComp commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354557198


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * The job autoscaler context, it includes all details related to the current 
job.
+ *
+ * @param  The job key.
+ */
+@Experimental
+public interface JobAutoScalerContext {
+
+/** The identifier of each flink job. */
+KEY getJobKey();
+
+JobID getJobID();
+
+Configuration getConfiguration();
+
+MetricGroup getMetricGroup();
+
+RestClusterClient getRestClusterClient() throws Exception;

Review Comment:
   Sorry for responding with a delay due to vacation and other things. I guess, 
you're right. The `ClusterClient` interface doesn't cover all the methods that 
are provided by the `RestClusterClient`. It feels like we would have to revisit 
the `ClusterClient` to align things again. One workaround would be to provide a 
wrapper interface that only reveals the methods that are need by the Kubernetes 
Operator. WDYT?
   
   My concern is that using a concrete implementation might make testing things 
harder.
   
   Btw. have we thought of relying on the OpenAPI definition to generate a REST 
client?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-11 Thread via GitHub


1996fanrui commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354382276


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##
@@ -47,6 +55,48 @@ public abstract class FlinkResourceContext status = getResource().getStatus();
+String jobId = status.getJobStatus().getJobId();
+
+JobStatus jobStatus = generateJobStatusEnum(status);
+
+return new KubernetesJobAutoScalerContext(
+jobId == null ? null : JobID.fromHexString(jobId),
+jobStatus,
+conf,
+getResourceMetricGroup(),
+() -> getFlinkService().getClusterClient(conf),
+resource,
+getKubernetesClient());
+}
+
+@Nullable
+private JobStatus generateJobStatusEnum(CommonStatus status) {
+if (status.getLifecycleState() != ResourceLifecycleState.STABLE) {
+return null;
+}
+
+String state = status.getJobStatus().getState();
+if (state == null) {
+return null;
+}
+return JobStatus.valueOf(state);

Review Comment:
   Sure, I didn't change it at this PR, and I created 
[FLINK-33237](https://issues.apache.org/jira/browse/FLINK-33237) to optimize 
this after this PR~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-11 Thread via GitHub


gyfora commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354365364


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##
@@ -47,6 +55,48 @@ public abstract class FlinkResourceContext status = getResource().getStatus();
+String jobId = status.getJobStatus().getJobId();
+
+JobStatus jobStatus = generateJobStatusEnum(status);
+
+return new KubernetesJobAutoScalerContext(
+jobId == null ? null : JobID.fromHexString(jobId),
+jobStatus,
+conf,
+getResourceMetricGroup(),
+() -> getFlinkService().getClusterClient(conf),
+resource,
+getKubernetesClient());
+}
+
+@Nullable
+private JobStatus generateJobStatusEnum(CommonStatus status) {
+if (status.getLifecycleState() != ResourceLifecycleState.STABLE) {
+return null;
+}
+
+String state = status.getJobStatus().getState();
+if (state == null) {
+return null;
+}
+return JobStatus.valueOf(state);

Review Comment:
   Let's not change this as part of this PR but feel free to open a JIRA for 
this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-11 Thread via GitHub


1996fanrui commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354340358


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##
@@ -47,6 +55,48 @@ public abstract class FlinkResourceContext status = getResource().getStatus();
+String jobId = status.getJobStatus().getJobId();
+
+JobStatus jobStatus = generateJobStatusEnum(status);
+
+return new KubernetesJobAutoScalerContext(
+jobId == null ? null : JobID.fromHexString(jobId),
+jobStatus,
+conf,
+getResourceMetricGroup(),
+() -> getFlinkService().getClusterClient(conf),
+resource,
+getKubernetesClient());
+}
+
+@Nullable
+private JobStatus generateJobStatusEnum(CommonStatus status) {
+if (status.getLifecycleState() != ResourceLifecycleState.STABLE) {
+return null;
+}
+
+String state = status.getJobStatus().getState();
+if (state == null) {
+return null;
+}
+return JobStatus.valueOf(state);

Review Comment:
   Hi @mxm  @gyfora , could the type of 
`org.apache.flink.kubernetes.operator.api.status.JobStatus#state` be changed 
from `String` to `org.apache.flink.api.common.JobStatus`?
   
   I see all setters respect it. If it can be changed, the 
`JobStatus.valueOf(state)` isn't necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-11 Thread via GitHub


1996fanrui commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1354332455


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##
@@ -174,6 +182,32 @@ public void reconcile(FlinkResourceContext ctx) throws 
Exception {
 }
 }
 
+private void scaling(FlinkResourceContext ctx) throws Exception {
+KubernetesJobAutoScalerContext autoScalerContext = 
ctx.getJobAutoScalerContext();
+
+if (autoscalerDisabled(ctx)) {
+autoScalerContext.getConfiguration().set(AUTOSCALER_ENABLED, 
false);
+resourceScaler.scale(autoScalerContext);
+return;
+}
+if (waitingForRunning(ctx.getResource().getStatus())) {
+LOG.info("Autoscaler is waiting for  stable, running state");
+resourceScaler.cleanup(autoScalerContext.getJobKey());
+return;

Review Comment:
   Thanks to @mxm for the suggestion!
   
   I have moved the `waitingForRunning` logic back to autoscaler module, and 
related tests have been recovered.
   
   Also, these 2 comments[1][2] have been recovered as well.
   
   [1] 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1350082356
   [2] 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1350079573



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-10 Thread via GitHub


1996fanrui commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1352333625


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##
@@ -72,7 +79,7 @@ public abstract class AbstractFlinkResourceReconciler<
 
 protected final EventRecorder eventRecorder;
 protected final StatusRecorder statusRecorder;
-protected final JobAutoScaler resourceScaler;
+protected final JobAutoScaler 
resourceScaler;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-10 Thread via GitHub


mxm commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1352290023


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##
@@ -174,6 +182,32 @@ public void reconcile(FlinkResourceContext ctx) throws 
Exception {
 }
 }
 
+private void scaling(FlinkResourceContext ctx) throws Exception {
+KubernetesJobAutoScalerContext autoScalerContext = 
ctx.getJobAutoScalerContext();
+
+if (autoscalerDisabled(ctx)) {
+autoScalerContext.getConfiguration().set(AUTOSCALER_ENABLED, 
false);
+resourceScaler.scale(autoScalerContext);
+return;
+}
+if (waitingForRunning(ctx.getResource().getStatus())) {
+LOG.info("Autoscaler is waiting for  stable, running state");
+resourceScaler.cleanup(autoScalerContext.getJobKey());
+return;

Review Comment:
   Preferably, I would like any logic related to applying parallelism inside 
the autoscaler implementation. This shouldn't change when the autoscaler is 
waiting for the running state. In fact, the job state checks should also be 
performed by the autoscaler, not by the reconciler. The current code mixes 
control over the parallelism overrides between the reconciler and the 
autoscaler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-10 Thread via GitHub


mxm commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1352282612


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##
@@ -72,7 +79,7 @@ public abstract class AbstractFlinkResourceReconciler<
 
 protected final EventRecorder eventRecorder;
 protected final StatusRecorder statusRecorder;
-protected final JobAutoScaler resourceScaler;
+protected final JobAutoScaler 
resourceScaler;

Review Comment:
   Can we rename this?
   
   ```suggestion
   protected final JobAutoScaler autoscaler;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-10 Thread via GitHub


1996fanrui commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1351975279


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -15,149 +15,180 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.config;
+package org.apache.flink.autoscaler.config;
 
+import org.apache.flink.autoscaler.metrics.MetricAggregator;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.MetricAggregator;
 
 import java.time.Duration;
 import java.util.List;
 
-import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.operatorConfig;
-
 /** Config options related to the autoscaler module. */
 public class AutoScalerOptions {
 
+public static final String DEPRECATED_K8S_OP_CONF_PREFIX = 
"kubernetes.operator.";
+public static final String AUTOSCALER_CONF_PREFIX = "job.autoscaler.";
+
+private static String deprecatedOperatorConfigKey(String key) {
+return DEPRECATED_K8S_OP_CONF_PREFIX + AUTOSCALER_CONF_PREFIX + key;
+}
+
+private static String autoScalerConfigKey(String key) {
+return AUTOSCALER_CONF_PREFIX + key;
+}
+
 private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
-return operatorConfig("job.autoscaler." + key);
+return ConfigOptions.key(autoScalerConfigKey(key));
 }
 
 public static final ConfigOption AUTOSCALER_ENABLED =
 autoScalerConfig("enabled")
 .booleanType()
 .defaultValue(false)
+.withDeprecatedKeys(deprecatedOperatorConfigKey("enabled"))

Review Comment:
   Added:
   > Note: The option prefix `kubernetes.operator.` was removed in FLIP-334, 
because the autoscaler module was decoupled from flink-kubernetes-operator.
   



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+import static 
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism;
+import static 
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism;
+
+/** The default implementation of {@link JobAutoScaler}. */
+public class JobAutoScalerImpl>
+implements JobAutoScaler {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(JobAutoScalerImpl.class);
+
+@VisibleForTesting protected static final String AUTOSCALER_ERROR = 
"AutoscalerError";

Review Comment:
   It's used at `BacklogBasedScalingTest`.



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingHistoryUtils.java:
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ 

Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-09 Thread via GitHub


mxm commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1350028811


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -166,6 +197,13 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .stringType()
 .asList()
 .defaultValues()
+
.withDeprecatedKeys(deprecatedOperatorConfigKey("vertex.exclude.ids"))
 .withDescription(
 "A (semicolon-separated) list of vertex ids in 
hexstring for which to disable scaling. Caution: For non-sink vertices this 
will still scale their downstream operators until 
https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
+
+public static final ConfigOption FLINK_CLIENT_TIMEOUT =
+autoScalerConfig("flink.client.timeout")
+.durationType()
+.defaultValue(Duration.ofSeconds(10))
+.withDescription("The timeout for waiting the flink rest 
client to return.");

Review Comment:
   Do we need to expose this as a configuration or can we just use the default? 
   
   I think it is better to let the user configure standard Flink configs as 
listed here: 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#client-timeout



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -15,149 +15,180 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.config;
+package org.apache.flink.autoscaler.config;
 
+import org.apache.flink.autoscaler.metrics.MetricAggregator;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.MetricAggregator;
 
 import java.time.Duration;
 import java.util.List;
 
-import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.operatorConfig;
-
 /** Config options related to the autoscaler module. */
 public class AutoScalerOptions {
 
+public static final String DEPRECATED_K8S_OP_CONF_PREFIX = 
"kubernetes.operator.";
+public static final String AUTOSCALER_CONF_PREFIX = "job.autoscaler.";
+
+private static String deprecatedOperatorConfigKey(String key) {
+return DEPRECATED_K8S_OP_CONF_PREFIX + AUTOSCALER_CONF_PREFIX + key;
+}
+
+private static String autoScalerConfigKey(String key) {
+return AUTOSCALER_CONF_PREFIX + key;
+}
+
 private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
-return operatorConfig("job.autoscaler." + key);
+return ConfigOptions.key(autoScalerConfigKey(key));
 }
 
 public static final ConfigOption AUTOSCALER_ENABLED =
 autoScalerConfig("enabled")
 .booleanType()
 .defaultValue(false)
+.withDeprecatedKeys(deprecatedOperatorConfigKey("enabled"))

Review Comment:
   This might confuse some existing users because the deprecated keys will not 
appear on the configuration page. Can we add a note on the configuration page 
that we renamed the configuration prefix?



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##
@@ -38,42 +36,42 @@
 import java.util.Map;
 import java.util.SortedMap;
 
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static 

Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-09 Thread via GitHub


mxm commented on PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#issuecomment-1752727711

   > If it's still needed, I can restore the 
`flink-kubernetes-operator-autoscaler` module. And moving all 
kubernetes-autoscaler related classes to this module. WDYT?
   
   Given that the autoscaler module is now decoupled from the operator version, 
it would make more sense than ever to have it pluggable. But given that the 
backends are not pluggable, the pluggability wouldn't be as useful anymore. I'm 
ok with removing the support for now if it proves difficult to maintain it.
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-09 Thread via GitHub


gyfora commented on PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#issuecomment-1752501151

   @1996fanrui Sorry I merged a PR related to event triggering from the 
ScalingExecutor. I will try to avoid merging further autoscaler related PRs 
until this is finalised. Please take care when rebasing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-09 Thread via GitHub


1996fanrui commented on PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#issuecomment-1752467497

   > The refactor also completely changes how the autoscaler module is loaded 
in the operator. Previously we intentionally loaded this as a service so it 
would be possible to potentially upgrade the autoscaler without the rest of the 
operator.
   > 
   > Now @1996fanrui you moved the operator related autoscaler classes in the 
main operator module resulting in a tight coupling.
   > 
   > @mxm how do you feel about this? Originally you were the one who 
introduced the service loader approach :)
   
   Sorry for breaking this due to I don't know the background before. And 
thanks to @gyfora for pointing it out.
   
   If it's still needed, I can restore the 
`flink-kubernetes-operator-autoscaler` module. And moving all 
kubernetes-autoscaler related classes to this module. WDYT?
   
   cc @mxm 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-03 Thread via GitHub


gyfora commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1343674332


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DefaultJobAutoScalerContext.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.function.SupplierWithException;
+
+/** The default job autoscaler context, the jobKey is JobID. */
+public class DefaultJobAutoScalerContext extends 
AbstractJobAutoScalerContext {

Review Comment:
   Let's simply delete the class and simply have a non-abstract 
`JobAutoscalerContext`



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/AbstractJobAutoScalerContext.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.function.SupplierWithException;
+
+/**
+ * The abstract job autoscaler context.
+ *
+ * @param  The job key.
+ */
+public abstract class AbstractJobAutoScalerContext implements 
JobAutoScalerContext {

Review Comment:
   I would simply rename this as `JobAutoScalerContext`, delete 
constructor , make all fields simply final and put `@Value` on it, to generate 
all the getters, toString etc.
   
   We don't need to have interfaces for everything, too much boilerplate



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+
+/**
+ * The job autoscaler context, it includes all details related to the current 
job.
+ *
+ * @param  The job key.
+ */
+@Experimental
+public interface JobAutoScalerContext {

Review Comment:
   Let's delete the interface for now (see my previous comment) we can re-add 
this later if necessary but I doubt it.



##

Re: [PR] [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces [flink-kubernetes-operator]

2023-10-03 Thread via GitHub


gyfora commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1343663466


##
flink-autoscaler/pom.xml:
##
@@ -45,6 +45,32 @@ under the License.
 provided
 
 
+
+org.projectlombok
+lombok
+${lombok.version}
+provided
+
+
+
+org.junit.jupiter
+junit-jupiter-params
+test
+
+
+